Support alter from String to JSON

This commit is contained in:
avogar 2024-10-07 06:53:12 +00:00
parent d73de82218
commit 7808f00857
30 changed files with 459 additions and 110 deletions

View File

@ -192,6 +192,13 @@ public:
bool hasDynamicStructure() const override { return getData().hasDynamicStructure(); }
void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override;
bool dynamicStructureEquals(const IColumn & rhs) const override
{
if (const auto * rhs_concrete = typeid_cast<const ColumnArray *>(&rhs))
return data->dynamicStructureEquals(*rhs_concrete->data);
return false;
}
private:
WrappedPtr data;
WrappedPtr offsets;

View File

@ -1153,6 +1153,15 @@ void ColumnDynamic::prepareVariantsForSquashing(const Columns & source_columns)
}
}
bool ColumnDynamic::dynamicStructureEquals(const IColumn & rhs) const
{
if (const auto * rhs_concrete = typeid_cast<const ColumnDynamic *>(&rhs))
return max_dynamic_types == rhs_concrete->max_dynamic_types && global_max_dynamic_types == rhs_concrete->global_max_dynamic_types
&& variant_info.variant_name == rhs_concrete->variant_info.variant_name
&& variant_column->dynamicStructureEquals(*rhs_concrete->variant_column);
return false;
}
void ColumnDynamic::takeDynamicStructureFromSourceColumns(const Columns & source_columns)
{
if (!empty())

View File

@ -367,6 +367,7 @@ public:
bool addNewVariant(const DataTypePtr & new_variant) { return addNewVariant(new_variant, new_variant->getName()); }
bool hasDynamicStructure() const override { return true; }
bool dynamicStructureEquals(const IColumn & rhs) const override;
void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override;
const StatisticsPtr & getStatistics() const { return statistics; }

View File

@ -330,6 +330,13 @@ bool ColumnMap::structureEquals(const IColumn & rhs) const
return false;
}
bool ColumnMap::dynamicStructureEquals(const IColumn & rhs) const
{
if (const auto * rhs_map = typeid_cast<const ColumnMap *>(&rhs))
return nested->dynamicStructureEquals(*rhs_map->nested);
return false;
}
ColumnPtr ColumnMap::compress() const
{
auto compressed = nested->compress();

View File

@ -120,6 +120,7 @@ public:
ColumnPtr compress() const override;
bool hasDynamicStructure() const override { return nested->hasDynamicStructure(); }
bool dynamicStructureEquals(const IColumn & rhs) const override;
void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override;
};

View File

@ -1299,6 +1299,31 @@ void ColumnObject::prepareForSquashing(const std::vector<ColumnPtr> & source_col
}
}
bool ColumnObject::dynamicStructureEquals(const IColumn & rhs) const
{
const auto * rhs_object = typeid_cast<const ColumnObject *>(&rhs);
if (!rhs_object || typed_paths.size() != rhs_object->typed_paths.size()
|| global_max_dynamic_paths != rhs_object->global_max_dynamic_paths || max_dynamic_types != rhs_object->max_dynamic_types
|| dynamic_paths.size() != rhs_object->dynamic_paths.size())
return false;
for (const auto & [path, column] : typed_paths)
{
auto it = rhs_object->typed_paths.find(path);
if (it == rhs_object->typed_paths.end() || !it->second->dynamicStructureEquals(*column))
return false;
}
for (const auto & [path, column] : dynamic_paths)
{
auto it = rhs_object->dynamic_paths.find(path);
if (it == rhs_object->dynamic_paths.end() || !it->second->dynamicStructureEquals(*column))
return false;
}
return true;
}
void ColumnObject::takeDynamicStructureFromSourceColumns(const DB::Columns & source_columns)
{
if (!empty())

View File

@ -172,6 +172,7 @@ public:
bool isFinalized() const override;
bool hasDynamicStructure() const override { return true; }
bool dynamicStructureEquals(const IColumn & rhs) const override;
void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override;
const PathToColumnMap & getTypedPaths() const { return typed_paths; }
@ -221,6 +222,7 @@ public:
void setDynamicPaths(const std::vector<String> & paths);
void setMaxDynamicPaths(size_t max_dynamic_paths_);
void setGlobalMaxDynamicPaths(size_t global_max_dynamic_paths_);
void setStatistics(const StatisticsPtr & statistics_) { statistics = statistics_; }
void serializePathAndValueIntoSharedData(ColumnString * shared_data_paths, ColumnString * shared_data_values, std::string_view path, const IColumn & column, size_t n);

View File

@ -727,6 +727,26 @@ bool ColumnTuple::hasDynamicStructure() const
return false;
}
bool ColumnTuple::dynamicStructureEquals(const IColumn & rhs) const
{
if (const auto * rhs_tuple = typeid_cast<const ColumnTuple *>(&rhs))
{
const size_t tuple_size = columns.size();
if (tuple_size != rhs_tuple->columns.size())
return false;
for (size_t i = 0; i < tuple_size; ++i)
if (!columns[i]->dynamicStructureEquals(*rhs_tuple->columns[i]))
return false;
return true;
}
else
{
return false;
}
}
void ColumnTuple::takeDynamicStructureFromSourceColumns(const Columns & source_columns)
{
std::vector<Columns> nested_source_columns;

View File

@ -138,6 +138,7 @@ public:
ColumnPtr & getColumnPtr(size_t idx) { return columns[idx]; }
bool hasDynamicStructure() const override;
bool dynamicStructureEquals(const IColumn & rhs) const override;
void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override;
/// Empty tuple needs a public method to manage its size.

View File

@ -1376,6 +1376,23 @@ bool ColumnVariant::structureEquals(const IColumn & rhs) const
return true;
}
bool ColumnVariant::dynamicStructureEquals(const IColumn & rhs) const
{
const auto * rhs_variant = typeid_cast<const ColumnVariant *>(&rhs);
if (!rhs_variant)
return false;
const size_t num_variants = variants.size();
if (num_variants != rhs_variant->variants.size())
return false;
for (size_t i = 0; i < num_variants; ++i)
if (!variants[i]->dynamicStructureEquals(rhs_variant->getVariantByGlobalDiscriminator(globalDiscriminatorByLocal(i))))
return false;
return true;
}
ColumnPtr ColumnVariant::compress() const
{
ColumnPtr local_discriminators_compressed = local_discriminators->compress();

View File

@ -327,6 +327,7 @@ public:
void extend(const std::vector<Discriminator> & old_to_new_global_discriminators, std::vector<std::pair<MutableColumnPtr, Discriminator>> && new_variants_and_discriminators);
bool hasDynamicStructure() const override;
bool dynamicStructureEquals(const IColumn & rhs) const override;
void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override;
private:

View File

@ -590,6 +590,9 @@ public:
/// Checks if column has dynamic subcolumns.
virtual bool hasDynamicStructure() const { return false; }
/// For columns with dynamic subcolumns checks if columns have equal dynamic structure.
[[nodiscard]] virtual bool dynamicStructureEquals(const IColumn & rhs) const { return structureEquals(rhs); }
/// For columns with dynamic subcolumns this method takes dynamic structure from source columns
/// and creates proper resulting dynamic structure in advance for merge of these source columns.
virtual void takeDynamicStructureFromSourceColumns(const std::vector<Ptr> & /*source_columns*/) {}

View File

@ -26,8 +26,8 @@ namespace ErrorCodes
struct SerializeBinaryBulkStateDynamic : public ISerialization::SerializeBinaryBulkState
{
SerializationDynamic::DynamicStructureSerializationVersion structure_version;
size_t max_dynamic_types;
SerializationDynamic::DynamicSerializationVersion structure_version;
size_t num_dynamic_types;
DataTypePtr variant_type;
Names variant_names;
SerializationPtr variant_serialization;
@ -81,14 +81,14 @@ void SerializationDynamic::enumerateStreams(
settings.path.pop_back();
}
SerializationDynamic::DynamicStructureSerializationVersion::DynamicStructureSerializationVersion(UInt64 version) : value(static_cast<Value>(version))
SerializationDynamic::DynamicSerializationVersion::DynamicSerializationVersion(UInt64 version) : value(static_cast<Value>(version))
{
checkVersion(version);
}
void SerializationDynamic::DynamicStructureSerializationVersion::checkVersion(UInt64 version)
void SerializationDynamic::DynamicSerializationVersion::checkVersion(UInt64 version)
{
if (version != VariantTypeName)
if (version != V1 && version != V2)
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid version for Dynamic structure serialization.");
}
@ -108,22 +108,17 @@ void SerializationDynamic::serializeBinaryBulkStatePrefix(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for Dynamic column structure during serialization of binary bulk state prefix");
/// Write structure serialization version.
UInt64 structure_version = DynamicStructureSerializationVersion::Value::VariantTypeName;
UInt64 structure_version = DynamicSerializationVersion::Value::V2;
writeBinaryLittleEndian(structure_version, *stream);
auto dynamic_state = std::make_shared<SerializeBinaryBulkStateDynamic>(structure_version);
dynamic_state->max_dynamic_types = column_dynamic.getMaxDynamicTypes();
/// Write max_dynamic_types parameter, because it can differ from the max_dynamic_types
/// that is specified in the Dynamic type (we could decrease it before merge).
writeVarUInt(dynamic_state->max_dynamic_types, *stream);
dynamic_state->variant_type = variant_info.variant_type;
dynamic_state->variant_names = variant_info.variant_names;
const auto & variant_column = column_dynamic.getVariantColumn();
/// Write information about variants.
size_t num_variants = dynamic_state->variant_names.size() - 1; /// Don't write shared variant, Dynamic column should always have it.
writeVarUInt(num_variants, *stream);
/// Write information about dynamic types.
dynamic_state->num_dynamic_types = dynamic_state->variant_names.size() - 1; /// -1 for SharedVariant
writeVarUInt(dynamic_state->num_dynamic_types, *stream);
if (settings.data_types_binary_encoding)
{
const auto & variants = assert_cast<const DataTypeVariant &>(*dynamic_state->variant_type).getVariants();
@ -251,22 +246,25 @@ ISerialization::DeserializeBinaryBulkStatePtr SerializationDynamic::deserializeD
UInt64 structure_version;
readBinaryLittleEndian(structure_version, *structure_stream);
auto structure_state = std::make_shared<DeserializeBinaryBulkStateDynamicStructure>(structure_version);
/// Read max_dynamic_types parameter.
readVarUInt(structure_state->max_dynamic_types, *structure_stream);
if (structure_state->structure_version.value == DynamicSerializationVersion::Value::V1)
{
/// Skip max_dynamic_types parameter in V1 serialization version.
size_t max_dynamic_types;
readVarUInt(max_dynamic_types, *structure_stream);
}
/// Read information about variants.
DataTypes variants;
size_t num_variants;
readVarUInt(num_variants, *structure_stream);
variants.reserve(num_variants + 1); /// +1 for shared variant.
readVarUInt(structure_state->num_dynamic_types, *structure_stream);
variants.reserve(structure_state->num_dynamic_types + 1); /// +1 for shared variant.
if (settings.data_types_binary_encoding)
{
for (size_t i = 0; i != num_variants; ++i)
for (size_t i = 0; i != structure_state->num_dynamic_types; ++i)
variants.push_back(decodeDataType(*structure_stream));
}
else
{
String data_type_name;
for (size_t i = 0; i != num_variants; ++i)
for (size_t i = 0; i != structure_state->num_dynamic_types; ++i)
{
readStringBinary(data_type_name, *structure_stream);
variants.push_back(DataTypeFactory::instance().get(data_type_name));
@ -364,9 +362,6 @@ void SerializationDynamic::serializeBinaryBulkWithMultipleStreamsAndCountTotalSi
if (!variant_info.variant_type->equals(*dynamic_state->variant_type))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mismatch of internal columns of Dynamic. Expected: {}, Got: {}", dynamic_state->variant_type->getName(), variant_info.variant_type->getName());
if (column_dynamic.getMaxDynamicTypes() != dynamic_state->max_dynamic_types)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mismatch of max_dynamic_types parameter of Dynamic. Expected: {}, Got: {}", dynamic_state->max_dynamic_types, column_dynamic.getMaxDynamicTypes());
settings.path.push_back(Substream::DynamicData);
assert_cast<const SerializationVariant &>(*dynamic_state->variant_serialization)
.serializeBinaryBulkWithMultipleStreamsAndUpdateVariantStatistics(
@ -424,7 +419,7 @@ void SerializationDynamic::deserializeBinaryBulkWithMultipleStreams(
if (mutable_column->empty())
{
column_dynamic.setMaxDynamicPaths(structure_state->max_dynamic_types);
column_dynamic.setMaxDynamicPaths(structure_state->num_dynamic_types);
column_dynamic.setVariantType(structure_state->variant_type);
column_dynamic.setStatistics(structure_state->statistics);
}

View File

@ -16,18 +16,28 @@ public:
{
}
struct DynamicStructureSerializationVersion
struct DynamicSerializationVersion
{
enum Value
{
VariantTypeName = 1,
/// V1 serialization:
/// - DynamicStructure stream:
/// <max_dynamic_types parameter>
/// <actual number of dynamic types>
/// <list of dynamic types (list of variants in nested Variant column without SharedVariant)>
/// <statistics with number of values for each dynamic type> (only in MergeTree serialization)
/// <statistics with number of values for some types in SharedVariant> (only in MergeTree serialization)
/// - DynamicData stream: contains the data of nested Variant column.
V1 = 1,
/// V2 serialization: the same as V1 but without max_dynamic_types parameter in DynamicStructure stream.
V2 = 2,
};
Value value;
static void checkVersion(UInt64 version);
explicit DynamicStructureSerializationVersion(UInt64 version);
explicit DynamicSerializationVersion(UInt64 version);
};
void enumerateStreams(
@ -113,9 +123,9 @@ private:
struct DeserializeBinaryBulkStateDynamicStructure : public ISerialization::DeserializeBinaryBulkState
{
DynamicStructureSerializationVersion structure_version;
DynamicSerializationVersion structure_version;
DataTypePtr variant_type;
size_t max_dynamic_types;
size_t num_dynamic_types;
ColumnDynamic::StatisticsPtr statistics;
explicit DeserializeBinaryBulkStateDynamicStructure(UInt64 structure_version_)

View File

@ -68,14 +68,13 @@ SerializationObject::ObjectSerializationVersion::ObjectSerializationVersion(UInt
void SerializationObject::ObjectSerializationVersion::checkVersion(UInt64 version)
{
if (version != BASIC)
if (version != V1 && version != V2)
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid version for Object structure serialization.");
}
struct SerializeBinaryBulkStateObject: public ISerialization::SerializeBinaryBulkState
{
SerializationObject::ObjectSerializationVersion serialization_version;
size_t max_dynamic_paths;
std::vector<String> sorted_dynamic_paths;
std::unordered_map<String, ISerialization::SerializeBinaryBulkStatePtr> typed_path_states;
std::unordered_map<String, ISerialization::SerializeBinaryBulkStatePtr> dynamic_path_states;
@ -193,13 +192,10 @@ void SerializationObject::serializeBinaryBulkStatePrefix(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for Object column structure during serialization of binary bulk state prefix");
/// Write serialization version.
UInt64 serialization_version = ObjectSerializationVersion::Value::BASIC;
UInt64 serialization_version = ObjectSerializationVersion::Value::V2;
writeBinaryLittleEndian(serialization_version, *stream);
auto object_state = std::make_shared<SerializeBinaryBulkStateObject>(serialization_version);
object_state->max_dynamic_paths = column_object.getMaxDynamicPaths();
/// Write max_dynamic_paths parameter.
writeVarUInt(object_state->max_dynamic_paths, *stream);
/// Write all dynamic paths in sorted order.
object_state->sorted_dynamic_paths.reserve(dynamic_paths.size());
for (const auto & [path, _] : dynamic_paths)
@ -353,8 +349,13 @@ ISerialization::DeserializeBinaryBulkStatePtr SerializationObject::deserializeOb
UInt64 serialization_version;
readBinaryLittleEndian(serialization_version, *structure_stream);
auto structure_state = std::make_shared<DeserializeBinaryBulkStateObjectStructure>(serialization_version);
/// Read max_dynamic_paths parameter.
readVarUInt(structure_state->max_dynamic_paths, *structure_stream);
if (structure_state->structure_version.value == ObjectSerializationVersion::Value::V1)
{
/// Skip max_dynamic_paths parameter in V1 serialization version.
size_t max_dynamic_paths;
readVarUInt(max_dynamic_paths, *structure_stream);
}
/// Read the sorted list of dynamic paths.
size_t dynamic_paths_size;
readVarUInt(dynamic_paths_size, *structure_stream);
@ -411,9 +412,6 @@ void SerializationObject::serializeBinaryBulkWithMultipleStreams(
const auto & shared_data = column_object.getSharedDataPtr();
auto * object_state = checkAndGetState<SerializeBinaryBulkStateObject>(state);
if (column_object.getMaxDynamicPaths() != object_state->max_dynamic_paths)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mismatch of max_dynamic_paths parameter of Object. Expected: {}, Got: {}", object_state->max_dynamic_paths, column_object.getMaxDynamicPaths());
if (column_object.getDynamicPaths().size() != object_state->sorted_dynamic_paths.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mismatch of number of dynamic paths in Object. Expected: {}, Got: {}", object_state->sorted_dynamic_paths.size(), column_object.getDynamicPaths().size());
@ -538,7 +536,7 @@ void SerializationObject::deserializeBinaryBulkWithMultipleStreams(
/// If it's a new object column, set dynamic paths and statistics.
if (column_object.empty())
{
column_object.setMaxDynamicPaths(structure_state->max_dynamic_paths);
column_object.setMaxDynamicPaths(structure_state->sorted_dynamic_paths.size());
column_object.setDynamicPaths(structure_state->sorted_dynamic_paths);
column_object.setStatistics(structure_state->statistics);
}

View File

@ -19,7 +19,20 @@ public:
{
enum Value
{
BASIC = 0,
/// V1 serialization:
/// - ObjectStructure stream:
/// <max_dynamic_paths parameter>
/// <actual number of dynamic paths>
/// <sorted list of dynamic paths>
/// <statistics with number of non-null values for dynamic paths> (only in MergeTree serialization)
/// <statistics with number of non-null values for some paths in shared data> (only in MergeTree serialization)
/// - ObjectData stream:
/// - ObjectTypedPath stream for each column in typed paths
/// - ObjectDynamicPath stream for each column in dynamic paths
/// - ObjectSharedData stream shared data column.
V1 = 0,
/// V2 serialization: the same as V1 but without max_dynamic_paths parameter in ObjectStructure stream.
V2 = 2,
};
Value value;
@ -82,7 +95,6 @@ private:
struct DeserializeBinaryBulkStateObjectStructure : public ISerialization::DeserializeBinaryBulkState
{
ObjectSerializationVersion structure_version;
size_t max_dynamic_paths;
std::vector<String> sorted_dynamic_paths;
std::unordered_set<String> dynamic_paths;
/// Paths statistics. Map (dynamic path) -> (number of non-null values in this path).

View File

@ -83,6 +83,7 @@ namespace Setting
extern const SettingsBool input_format_ipv4_default_on_conversion_error;
extern const SettingsBool input_format_ipv6_default_on_conversion_error;
extern const SettingsBool precise_float_parsing;
extern const SettingsBool cast_to_json_disable_dynamic_subcolumns;
}
namespace ErrorCodes
@ -4056,9 +4057,7 @@ private:
{
return [this](ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable * nullable_source, size_t input_rows_count)
{
auto res = ConvertImplGenericFromString<true>::execute(arguments, result_type, nullable_source, input_rows_count, context)->assumeMutable();
res->finalize();
return res;
return ConvertImplGenericFromString<true>::execute(arguments, result_type, nullable_source, input_rows_count, context)->assumeMutable();
};
}

View File

@ -1466,13 +1466,13 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
"The change of data type {} of column {} to {} is not allowed. It has known bugs",
old_data_type->getName(), backQuote(column_name), command.data_type->getName());
bool has_object_type = isObject(command.data_type);
command.data_type->forEachChild([&](const IDataType & type){ has_object_type |= isObject(type); });
if (has_object_type)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The change of data type {} of column {} to {} is not supported.",
old_data_type->getName(), backQuote(column_name), command.data_type->getName());
// bool has_object_type = isObject(command.data_type);
// command.data_type->forEachChild([&](const IDataType & type){ has_object_type |= isObject(type); });
// if (has_object_type)
// throw Exception(
// ErrorCodes::BAD_ARGUMENTS,
// "The change of data type {} of column {} to {} is not supported.",
// old_data_type->getName(), backQuote(column_name), command.data_type->getName());
}
if (command.isRemovingProperty())

View File

@ -61,22 +61,6 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
}
}
void MergeTreeDataPartWriterCompact::initDynamicStreamsIfNeeded(const Block & block)
{
if (is_dynamic_streams_initialized)
return;
is_dynamic_streams_initialized = true;
for (const auto & column : columns_list)
{
if (column.type->hasDynamicSubcolumns())
{
auto compression = getCodecDescOrDefault(column.name, default_codec);
addStreams(column, block.getByName(column.name).column, compression);
}
}
}
void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & name_and_type, const ColumnPtr & column, const ASTPtr & effective_codec_desc)
{
ISerialization::StreamCallback callback = [&](const auto & substream_path)
@ -175,20 +159,25 @@ void writeColumnSingleGranule(
void MergeTreeDataPartWriterCompact::write(const Block & block, const IColumn::Permutation * permutation)
{
/// On first block of data initialize streams for dynamic subcolumns.
initDynamicStreamsIfNeeded(block);
Block result_block = block;
/// During serialization columns with dynamic subcolumns (like JSON/Dynamic) must have the same dynamic structure.
/// But it may happen that they don't (for example during ALTER MODIFY COLUMN from some type to JSON/Dynamic).
/// In this case we use dynamic structure of the column from the first written block and adjust columns from
/// the next blocks so they match this dynamic structure.
initOrAdjustDynamicStructureIfNeeded(result_block);
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
if (compute_granularity)
{
size_t index_granularity_for_block = computeIndexGranularity(block);
size_t index_granularity_for_block = computeIndexGranularity(result_block);
assert(index_granularity_for_block >= 1);
fillIndexGranularity(index_granularity_for_block, block.rows());
fillIndexGranularity(index_granularity_for_block, result_block.rows());
}
Block result_block = permuteBlockIfNeeded(block, permutation);
result_block = permuteBlockIfNeeded(result_block, permutation);
if (!header)
header = result_block.cloneEmpty();

View File

@ -48,9 +48,7 @@ private:
void addToChecksums(MergeTreeDataPartChecksums & checksums);
void addStreams(const NameAndTypePair & name_and_type, const ColumnPtr & column, const ASTPtr & effective_codec_desc);
void initDynamicStreamsIfNeeded(const Block & block);
void addStreams(const NameAndTypePair & name_and_type, const ColumnPtr & column, const ASTPtr & effective_codec_desc) override;
Block header;
@ -104,8 +102,6 @@ private:
/// then finally to 'marks_file'.
std::unique_ptr<CompressedWriteBuffer> marks_compressor;
std::unique_ptr<HashingWriteBuffer> marks_source_hashing;
bool is_dynamic_streams_initialized = false;
};
}

View File

@ -557,6 +557,45 @@ Names MergeTreeDataPartWriterOnDisk::getSkipIndicesColumns() const
return Names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
}
void MergeTreeDataPartWriterOnDisk::initOrAdjustDynamicStructureIfNeeded(Block & block)
{
if (!is_dynamic_streams_initialized)
{
for (const auto & column : columns_list)
{
if (column.type->hasDynamicSubcolumns())
{
/// Create all streams for dynamic subcolumns using dynamic structure from block.
auto compression = getCodecDescOrDefault(column.name, default_codec);
addStreams(column, block.getByName(column.name).column, compression);
}
}
is_dynamic_streams_initialized = true;
block_sample = block.cloneEmpty();
}
else
{
size_t size = block.columns();
for (size_t i = 0; i != size; ++i)
{
auto & column = block.getByPosition(i);
const auto & sample_column = block_sample.getByPosition(i);
/// Check if the dynamic structure of this column is different from the sample column.
if (column.type->hasDynamicSubcolumns() && !column.column->dynamicStructureEquals(*sample_column.column))
{
/// We need to change the dynamic structure of the column so it matches the sample column.
/// To do it, we create empty column of this type, take dynamic structure from sample column
/// and insert data into it. Resulting column will have required dynamic structure and the content
/// of the column in current block.
auto new_column = sample_column.type->createColumn();
new_column->takeDynamicStructureFromSourceColumns({sample_column.column});
new_column->insertRangeFrom(*column.column, 0, column.column->size());
column.column = std::move(new_column);
}
}
}
}
template struct MergeTreeDataPartWriterOnDisk::Stream<false>;
template struct MergeTreeDataPartWriterOnDisk::Stream<true>;

View File

@ -153,6 +153,14 @@ protected:
/// Get unique non ordered skip indices column.
Names getSkipIndicesColumns() const;
virtual void addStreams(const NameAndTypePair & name_and_type, const ColumnPtr & column, const ASTPtr & effective_codec_desc) = 0;
/// On first block create all required streams for columns with dynamic subcolumns and remember the block sample.
/// On each next block check if dynamic structure of the columns equals to the dynamic structure of the same
/// columns in the sample block. If for some column dynamic structure is different, adjust it so it matches
/// the structure from the sample.
void initOrAdjustDynamicStructureIfNeeded(Block & block);
const MergeTreeIndices skip_indices;
const ColumnsStatistics stats;
@ -187,6 +195,10 @@ protected:
size_t current_mark = 0;
GinIndexStoreFactory::GinIndexStores gin_index_stores;
bool is_dynamic_streams_initialized = false;
Block block_sample;
private:
void initSkipIndices();
void initPrimaryIndex();

View File

@ -106,23 +106,6 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
}
}
void MergeTreeDataPartWriterWide::initDynamicStreamsIfNeeded(const DB::Block & block)
{
if (is_dynamic_streams_initialized)
return;
is_dynamic_streams_initialized = true;
block_sample = block.cloneEmpty();
for (const auto & column : columns_list)
{
if (column.type->hasDynamicSubcolumns())
{
auto compression = getCodecDescOrDefault(column.name, default_codec);
addStreams(column, block_sample.getByName(column.name).column, compression);
}
}
}
void MergeTreeDataPartWriterWide::addStreams(
const NameAndTypePair & name_and_type,
const ColumnPtr & column,
@ -260,15 +243,20 @@ void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_wri
void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Permutation * permutation)
{
/// On first block of data initialize streams for dynamic subcolumns.
initDynamicStreamsIfNeeded(block);
Block block_to_write = block;
/// During serialization columns with dynamic subcolumns (like JSON/Dynamic) must have the same dynamic structure.
/// But it may happen that they don't (for example during ALTER MODIFY COLUMN from some type to JSON/Dynamic).
/// In this case we use dynamic structure of the column from the first written block and adjust columns from
/// the next blocks so they match this dynamic structure.
initOrAdjustDynamicStructureIfNeeded(block_to_write);
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical part of vertical merge)
if (compute_granularity)
{
size_t index_granularity_for_block = computeIndexGranularity(block);
size_t index_granularity_for_block = computeIndexGranularity(block_to_write);
if (rows_written_in_last_mark > 0)
{
size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark;
@ -286,11 +274,9 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
}
}
fillIndexGranularity(index_granularity_for_block, block.rows());
fillIndexGranularity(index_granularity_for_block, block_to_write.rows());
}
Block block_to_write = block;
auto granules_to_write = getGranulesToWrite(index_granularity, block_to_write.rows(), getCurrentMark(), rows_written_in_last_mark);
auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{};

View File

@ -91,9 +91,7 @@ private:
void addStreams(
const NameAndTypePair & name_and_type,
const ColumnPtr & column,
const ASTPtr & effective_codec_desc);
void initDynamicStreamsIfNeeded(const Block & block);
const ASTPtr & effective_codec_desc) override;
/// Method for self check (used in debug-build only). Checks that written
/// data and corresponding marks are consistent. Otherwise throws logical
@ -139,10 +137,6 @@ private:
/// How many rows we have already written in the current mark.
/// More than zero when incoming blocks are smaller then their granularity.
size_t rows_written_in_last_mark = 0;
Block block_sample;
bool is_dynamic_streams_initialized = false;
};
}

View File

@ -0,0 +1,134 @@
All paths:
['key0','key1','key2','key3','key4','key5']
Shared data paths:
key2
key3
key4
key5
{"key0":"value0"}
{"key1":"value1"}
{"key0":"value2"}
{"key1":"value3"}
{"key0":"value4"}
{"key1":"value5"}
{"key0":"value6"}
{"key1":"value7"}
{"key0":"value8"}
{"key1":"value9"}
{"key2":"value300000"}
{"key3":"value300001"}
{"key2":"value300002"}
{"key3":"value300003"}
{"key2":"value300004"}
{"key3":"value300005"}
{"key2":"value300006"}
{"key3":"value300007"}
{"key2":"value300008"}
{"key3":"value300009"}
{"key4":"value600000"}
{"key5":"value600001"}
{"key4":"value600002"}
{"key5":"value600003"}
{"key4":"value600004"}
{"key5":"value600005"}
{"key4":"value600006"}
{"key5":"value600007"}
{"key4":"value600008"}
{"key5":"value600009"}
value0 \N \N \N \N \N
\N value1 \N \N \N \N
value2 \N \N \N \N \N
\N value3 \N \N \N \N
value4 \N \N \N \N \N
\N value5 \N \N \N \N
value6 \N \N \N \N \N
\N value7 \N \N \N \N
value8 \N \N \N \N \N
\N value9 \N \N \N \N
\N \N value300000 \N \N \N
\N \N \N value300001 \N \N
\N \N value300002 \N \N \N
\N \N \N value300003 \N \N
\N \N value300004 \N \N \N
\N \N \N value300005 \N \N
\N \N value300006 \N \N \N
\N \N \N value300007 \N \N
\N \N value300008 \N \N \N
\N \N \N value300009 \N \N
\N \N \N \N value600000 \N
\N \N \N \N \N value600001
\N \N \N \N value600002 \N
\N \N \N \N \N value600003
\N \N \N \N value600004 \N
\N \N \N \N \N value600005
\N \N \N \N value600006 \N
\N \N \N \N \N value600007
\N \N \N \N value600008 \N
\N \N \N \N \N value600009
All paths:
['key0','key1','key2','key3','key4','key5']
Shared data paths:
key2
key3
key4
key5
{"key0":"value0"}
{"key1":"value1"}
{"key0":"value2"}
{"key1":"value3"}
{"key0":"value4"}
{"key1":"value5"}
{"key0":"value6"}
{"key1":"value7"}
{"key0":"value8"}
{"key1":"value9"}
{"key2":"value300000"}
{"key3":"value300001"}
{"key2":"value300002"}
{"key3":"value300003"}
{"key2":"value300004"}
{"key3":"value300005"}
{"key2":"value300006"}
{"key3":"value300007"}
{"key2":"value300008"}
{"key3":"value300009"}
{"key4":"value600000"}
{"key5":"value600001"}
{"key4":"value600002"}
{"key5":"value600003"}
{"key4":"value600004"}
{"key5":"value600005"}
{"key4":"value600006"}
{"key5":"value600007"}
{"key4":"value600008"}
{"key5":"value600009"}
value0 \N \N \N \N \N
\N value1 \N \N \N \N
value2 \N \N \N \N \N
\N value3 \N \N \N \N
value4 \N \N \N \N \N
\N value5 \N \N \N \N
value6 \N \N \N \N \N
\N value7 \N \N \N \N
value8 \N \N \N \N \N
\N value9 \N \N \N \N
\N \N value300000 \N \N \N
\N \N \N value300001 \N \N
\N \N value300002 \N \N \N
\N \N \N value300003 \N \N
\N \N value300004 \N \N \N
\N \N \N value300005 \N \N
\N \N value300006 \N \N \N
\N \N \N value300007 \N \N
\N \N value300008 \N \N \N
\N \N \N value300009 \N \N
\N \N \N \N value600000 \N
\N \N \N \N \N value600001
\N \N \N \N value600002 \N
\N \N \N \N \N value600003
\N \N \N \N value600004 \N
\N \N \N \N \N value600005
\N \N \N \N value600006 \N
\N \N \N \N \N value600007
\N \N \N \N value600008 \N
\N \N \N \N \N value600009

View File

@ -0,0 +1,32 @@
set allow_experimental_json_type = 1;
drop table if exists test;
{% for create_command in ['create table test (x UInt64, json String) engine=MergeTree order by x settings min_rows_for_wide_part=100000000, min_bytes_for_wide_part=1000000000;',
'create table test (x UInt64, json String) engine=MergeTree order by x settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;'] -%}
{{ create_command }}
insert into test select number, toJSONString(map('key' || multiIf(number < 300000, number % 2, number < 600000, number % 2 + 2, number % 2 + 4), 'value' || number)) from numbers(1000000);
alter table test modify column json JSON settings mutations_sync=1;
select 'All paths:';
select distinctJSONPaths(json) from test;
select 'Shared data paths:';
select distinct (arrayJoin(JSONSharedDataPaths(json))) as path from test order by path;
select json from test order by x limit 10;
select json from test order by x limit 10 offset 300000;
select json from test order by x limit 10 offset 600000;
select json.key0, json.key1, json.key2, json.key3, json.key4, json.key5 from test order by x limit 10;
select json.key0, json.key1, json.key2, json.key3, json.key4, json.key5 from test order by x limit 10 offset 300000;
select json.key0, json.key1, json.key2, json.key3, json.key4, json.key5 from test order by x limit 10 offset 600000;
select json from test format Null;
select json from test order by x format Null;
select json.key0, json.key1, json.key2, json.key3, json.key4, json.key5 from test format Null;
select json.key0, json.key1, json.key2, json.key3, json.key4, json.key5 from test order by x format Null;
drop table test;
{% endfor -%}

View File

@ -0,0 +1,12 @@
5000
leonardomso/33-js-concepts 3
ytdl-org/youtube-dl 3
Bogdanp/neko 2
bminossi/AllVideoPocsFromHackerOne 2
disclose/diodata 2
Commit 182
chipeo345 119
phanwi346 114
Nicholas Piggin 95
direwolf-github 49
2

View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-s3-storage, long
# ^ no-s3-storage: too memory hungry
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS ghdata"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE ghdata (data String) ENGINE = MergeTree ORDER BY tuple() SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi'"
cat $CUR_DIR/data_json/ghdata_sample.json | ${CLICKHOUSE_CLIENT} \
--max_memory_usage 10G --query "INSERT INTO ghdata FORMAT JSONAsString"
${CLICKHOUSE_CLIENT} -q "ALTER TABLE ghdata MODIFY column data JSON SETTINGS mutations_sync=1" --allow_experimental_json_type 1
${CLICKHOUSE_CLIENT} -q "SELECT count() FROM ghdata WHERE NOT ignore(*)"
${CLICKHOUSE_CLIENT} -q \
"SELECT data.repo.name, count() AS stars FROM ghdata \
WHERE data.type = 'WatchEvent' GROUP BY data.repo.name ORDER BY stars DESC, data.repo.name LIMIT 5"
${CLICKHOUSE_CLIENT} --enable_analyzer=1 -q \
"SELECT data.payload.commits[].author.name AS name, count() AS c FROM ghdata \
ARRAY JOIN data.payload.commits[].author.name \
GROUP BY name ORDER BY c DESC, name LIMIT 5"
${CLICKHOUSE_CLIENT} -q "SELECT max(data.payload.pull_request.assignees[].size0) FROM ghdata"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS ghdata"

View File

@ -0,0 +1,17 @@
set allow_experimental_json_type=1;
set max_insert_block_size=10000;
set max_block_size=10000;
drop table if exists test;
drop named collection if exists json_alter_fuzzer;
create table test (json String) engine=MergeTree order by tuple();
create named collection json_alter_fuzzer AS json_str='{}';
insert into test select * from fuzzJSON(json_alter_fuzzer, reuse_output=true, max_output_length=128) limit 200000;
alter table test modify column json JSON settings mutations_sync=1;
select json from test format Null;
optimize table test final;
select json from test format Null;
drop named collection json_alter_fuzzer;
drop table test;