Merge branch 'master' into ch_canh_fix_prefix_not_like

This commit is contained in:
Duc Canh Le 2022-11-02 09:42:01 +08:00 committed by GitHub
commit c6598dc66d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
102 changed files with 1230 additions and 879 deletions

4
.snyk Normal file
View File

@ -0,0 +1,4 @@
# Snyk (https://snyk.io) policy file
exclude:
global:
- tests/**

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="22.10.1.1877"
ARG VERSION="22.10.2.11"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -21,7 +21,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="22.10.1.1877"
ARG VERSION="22.10.2.11"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -212,4 +212,4 @@ Templates:
## How to Build Documentation
You can build your documentation manually by following the instructions in [docs/tools/README.md](../docs/tools/README.md). Also, our CI runs the documentation build after the `documentation` label is added to PR. You can see the results of a build in the GitHub interface. If you have no permissions to add labels, a reviewer of your PR will add it.
You can build your documentation manually by following the instructions in the docs repo [contrib-writing-guide](https://github.com/ClickHouse/clickhouse-docs/blob/main/contrib-writing-guide.md). Also, our CI runs the documentation build after the `documentation` label is added to PR. You can see the results of a build in the GitHub interface. If you have no permissions to add labels, a reviewer of your PR will add it.

View File

@ -0,0 +1,18 @@
---
sidebar_position: 1
sidebar_label: 2022
---
# 2022 Changelog
### ClickHouse release v22.10.2.11-stable (d2bfcaba002) FIXME as compared to v22.10.1.1877-stable (98ab5a3c189)
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#42750](https://github.com/ClickHouse/ClickHouse/issues/42750): A segmentation fault related to DNS & c-ares has been reported. The below error ocurred in multiple threads: ``` 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008088 [ 356 ] {} <Fatal> BaseDaemon: ######################################## 2022-09-28 15:41:19.008,"2022.09.28 15:41:19.008147 [ 356 ] {} <Fatal> BaseDaemon: (version 22.8.5.29 (official build), build id: 92504ACA0B8E2267) (from thread 353) (no query) Received signal Segmentation fault (11)" 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008196 [ 356 ] {} <Fatal> BaseDaemon: Address: 0xf Access: write. Address not mapped to object. 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008216 [ 356 ] {} <Fatal> BaseDaemon: Stack trace: 0x188f8212 0x1626851b 0x1626a69e 0x16269b3f 0x16267eab 0x13cf8284 0x13d24afc 0x13c5217e 0x14ec2495 0x15ba440f 0x15b9d13b 0x15bb2699 0x1891ccb3 0x1891e00d 0x18ae0769 0x18ade022 0x7f76aa985609 0x7f76aa8aa133 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008274 [ 356 ] {} <Fatal> BaseDaemon: 2. Poco::Net::IPAddress::family() const @ 0x188f8212 in /usr/bin/clickhouse 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008297 [ 356 ] {} <Fatal> BaseDaemon: 3. ? @ 0x1626851b in /usr/bin/clickhouse 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008309 [ 356 ] {} <Fatal> BaseDaemon: 4. ? @ 0x1626a69e in /usr/bin/clickhouse ```. [#42234](https://github.com/ClickHouse/ClickHouse/pull/42234) ([Arthur Passos](https://github.com/arthurpassos)).
* Backported in [#42793](https://github.com/ClickHouse/ClickHouse/issues/42793): Fix a bug in ParserFunction that could have led to a segmentation fault. [#42724](https://github.com/ClickHouse/ClickHouse/pull/42724) ([Nikolay Degterinsky](https://github.com/evillique)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Always run `BuilderReport` and `BuilderSpecialReport` in all CI types [#42684](https://github.com/ClickHouse/ClickHouse/pull/42684) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -1,10 +1,7 @@
---
slug: /en/operations/update
sidebar_position: 47
sidebar_label: ClickHouse Upgrade
---
# ClickHouse Upgrade
[//]: # (This file is included in Manage > Updates)
## Self-managed ClickHouse Upgrade
If ClickHouse was installed from `deb` packages, execute the following commands on the server:

View File

@ -178,7 +178,7 @@ Columns:
- `view_definition` ([String](../../sql-reference/data-types/string.md)) — `SELECT` query for view.
- `check_option` ([String](../../sql-reference/data-types/string.md)) — `NONE`, no checking.
- `is_updatable` ([Enum8](../../sql-reference/data-types/enum.md)) — `NO`, the view is not updated.
- `is_insertable_into` ([Enum8](../../sql-reference/data-types/enum.md)) — Shows whether the created view is [materialized](../../sql-reference/statements/create/view/#materialized). Possible values:
- `is_insertable_into` ([Enum8](../../sql-reference/data-types/enum.md)) — Shows whether the created view is [materialized](../../sql-reference/statements/create/view.md/#materialized-view). Possible values:
- `NO` — The created view is not materialized.
- `YES` — The created view is materialized.
- `is_trigger_updatable` ([Enum8](../../sql-reference/data-types/enum.md)) — `NO`, the trigger is not updated.

View File

@ -68,6 +68,5 @@ thread_id: 54
**See Also**
- [Managing ReplicatedMergeTree Tables](../../sql-reference/statements/system/#query-language-system-replicated)
- [Managing ReplicatedMergeTree Tables](../../sql-reference/statements/system.md/#managing-replicatedmergetree-tables)
[Original article](https://clickhouse.com/docs/en/operations/system_tables/replicated_fetches) <!--hide-->

View File

@ -4,7 +4,7 @@ sidebar_position: 38
sidebar_label: FUNCTION
---
# CREATE FUNCTION
# CREATE FUNCTION &mdash; user defined function (UDF)
Creates a user defined function from a lambda expression. The expression must consist of function parameters, constants, operators, or other function calls.

View File

@ -176,6 +176,9 @@ public:
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
void finalize() override { data->finalize(); }
bool isFinalized() const override { return data->isFinalized(); }
bool isCollationSupported() const override { return getData().isCollationSupported(); }
size_t getNumberOfDimensions() const;

View File

@ -93,6 +93,8 @@ public:
bool structureEquals(const IColumn & rhs) const override;
double getRatioOfDefaultRows(double sample_ratio) const override;
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
void finalize() override { nested->finalize(); }
bool isFinalized() const override { return nested->isFinalized(); }
const ColumnArray & getNestedColumn() const { return assert_cast<const ColumnArray &>(*nested); }
ColumnArray & getNestedColumn() { return assert_cast<ColumnArray &>(*nested); }

View File

@ -732,8 +732,8 @@ void ColumnObject::get(size_t n, Field & res) const
{
assert(n < size());
res = Object();
auto & object = res.get<Object &>();
for (const auto & entry : subcolumns)
{
auto it = object.try_emplace(entry->path.getPath()).first;
@ -744,7 +744,6 @@ void ColumnObject::get(size_t n, Field & res) const
void ColumnObject::insertFrom(const IColumn & src, size_t n)
{
insert(src[n]);
finalize();
}
void ColumnObject::insertRangeFrom(const IColumn & src, size_t start, size_t length)
@ -792,9 +791,8 @@ MutableColumnPtr ColumnObject::applyForSubcolumns(Func && func) const
{
if (!isFinalized())
{
auto finalized = IColumn::mutate(getPtr());
auto finalized = cloneFinalized();
auto & finalized_object = assert_cast<ColumnObject &>(*finalized);
finalized_object.finalize();
return finalized_object.applyForSubcolumns(std::forward<Func>(func));
}

View File

@ -198,10 +198,6 @@ public:
Subcolumns & getSubcolumns() { return subcolumns; }
PathsInData getKeys() const;
/// Finalizes all subcolumns.
void finalize();
bool isFinalized() const;
/// Part of interface
const char * getFamilyName() const override { return "Object"; }
@ -219,12 +215,17 @@ public:
void popBack(size_t length) override;
Field operator[](size_t n) const override;
void get(size_t n, Field & res) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr filter(const Filter & filter, ssize_t result_size_hint) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
ColumnPtr replicate(const Offsets & offsets) const override;
MutableColumnPtr cloneResized(size_t new_size) const override;
/// Finalizes all subcolumns.
void finalize() override;
bool isFinalized() const override;
/// Order of rows in ColumnObject is undefined.
void getPermutation(PermutationSortDirection, PermutationSortStability, size_t, int, Permutation & res) const override;
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
@ -264,9 +265,7 @@ private:
template <typename Func>
MutableColumnPtr applyForSubcolumns(Func && func) const;
/// For given subcolumn return subcolumn from the same Nested type.
/// It's used to get shared sized of Nested to insert correct default values.
const Subcolumns::Node * getLeafOfTheSameNested(const Subcolumns::NodePtr & entry) const;
};
}

View File

@ -570,4 +570,15 @@ void ColumnTuple::getIndicesOfNonDefaultRows(Offsets & indices, size_t from, siz
return getIndicesOfNonDefaultRowsImpl<ColumnTuple>(indices, from, limit);
}
void ColumnTuple::finalize()
{
for (auto & column : columns)
column->finalize();
}
bool ColumnTuple::isFinalized() const
{
return std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column->isFinalized(); });
}
}

View File

@ -103,6 +103,8 @@ public:
ColumnPtr compress() const override;
double getRatioOfDefaultRows(double sample_ratio) const override;
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
void finalize() override;
bool isFinalized() const override;
size_t tupleSize() const { return columns.size(); }

View File

@ -453,6 +453,16 @@ public:
return getPtr();
}
/// Some columns may require finalization before using of other operations.
virtual void finalize() {}
virtual bool isFinalized() const { return true; }
MutablePtr cloneFinalized() const
{
auto finalized = IColumn::mutate(getPtr());
finalized->finalize();
return finalized;
}
[[nodiscard]] static MutablePtr mutate(Ptr ptr)
{

View File

@ -5,6 +5,7 @@
#include <algorithm>
#include <utility>
#include <base/range.h>
#include <base/unaligned.h>
#include <Common/hex.h>
#include <Common/StringUtils/StringUtils.h>
@ -55,8 +56,11 @@ inline bool parseIPv4(const char * src, unsigned char * dst)
}
if (*(src - 1) != '\0')
return false;
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
reverseMemcpy(dst, &result, sizeof(result));
#else
memcpy(dst, &result, sizeof(result));
#endif
return true;
}

View File

@ -48,6 +48,7 @@ public:
bool textCanContainOnlyValidUTF8() const override { return nested->textCanContainOnlyValidUTF8(); }
bool isComparable() const override { return nested->isComparable(); }
bool canBeComparedWithCollation() const override { return nested->canBeComparedWithCollation(); }
bool hasDynamicSubcolumns() const override { return nested->hasDynamicSubcolumns(); }
bool isValueUnambiguouslyRepresentedInContiguousMemoryRegion() const override
{

View File

@ -22,6 +22,27 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
DataTypeMap::DataTypeMap(const DataTypePtr & nested_)
: nested(nested_)
{
const auto * type_array = typeid_cast<const DataTypeArray *>(nested.get());
if (!type_array)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Expected Array(Tuple(key, value)) type, got {}", nested->getName());
const auto * type_tuple = typeid_cast<const DataTypeTuple *>(type_array->getNestedType().get());
if (!type_tuple)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Expected Array(Tuple(key, value)) type, got {}", nested->getName());
if (type_tuple->getElements().size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Expected Array(Tuple(key, value)) type, got {}", nested->getName());
key_type = type_tuple->getElement(0);
value_type = type_tuple->getElement(1);
assertKeyType();
}
DataTypeMap::DataTypeMap(const DataTypes & elems_)
{

View File

@ -23,6 +23,7 @@ private:
public:
static constexpr bool is_parametric = true;
explicit DataTypeMap(const DataTypePtr & nested_);
explicit DataTypeMap(const DataTypes & elems);
DataTypeMap(const DataTypePtr & key_type_, const DataTypePtr & value_type_);
@ -40,6 +41,7 @@ public:
bool isComparable() const override { return key_type->isComparable() && value_type->isComparable(); }
bool isParametric() const override { return true; }
bool haveSubtypes() const override { return true; }
bool hasDynamicSubcolumns() const override { return nested->hasDynamicSubcolumns(); }
const DataTypePtr & getKeyType() const { return key_type; }
const DataTypePtr & getValueType() const { return value_type; }

View File

@ -36,6 +36,7 @@ public:
bool haveSubtypes() const override { return false; }
bool equals(const IDataType & rhs) const override;
bool isParametric() const override { return true; }
bool hasDynamicSubcolumns() const override { return true; }
SerializationPtr doGetDefaultSerialization() const override;

View File

@ -247,6 +247,11 @@ bool DataTypeTuple::haveMaximumSizeOfValue() const
return std::all_of(elems.begin(), elems.end(), [](auto && elem) { return elem->haveMaximumSizeOfValue(); });
}
bool DataTypeTuple::hasDynamicSubcolumns() const
{
return std::any_of(elems.begin(), elems.end(), [](auto && elem) { return elem->hasDynamicSubcolumns(); });
}
bool DataTypeTuple::isComparable() const
{
return std::all_of(elems.begin(), elems.end(), [](auto && elem) { return elem->isComparable(); });

View File

@ -50,6 +50,7 @@ public:
bool isComparable() const override;
bool textCanContainOnlyValidUTF8() const override;
bool haveMaximumSizeOfValue() const override;
bool hasDynamicSubcolumns() const override;
size_t getMaximumSizeOfValueInMemory() const override;
size_t getSizeOfValueInMemory() const override;

View File

@ -291,6 +291,9 @@ public:
/// Strings, Numbers, Date, DateTime, Nullable
virtual bool canBeInsideLowCardinality() const { return false; }
/// Object, Array(Object), Tuple(..., Object, ...)
virtual bool hasDynamicSubcolumns() const { return false; }
/// Updates avg_value_size_hint for newly read column. Uses to optimize deserialization. Zero expected for first column.
static void updateAvgValueSizeHint(const IColumn & column, double & avg_value_size_hint);

View File

@ -1,17 +1,19 @@
#include <Storages/StorageSnapshot.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/DataTypeObject.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNested.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/getLeastSupertype.h>
#include <DataTypes/NestedUtils.h>
#include <Storages/StorageSnapshot.h>
#include <Columns/ColumnObject.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNullable.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTExpressionList.h>
@ -105,10 +107,11 @@ Array createEmptyArrayField(size_t num_dimensions)
DataTypePtr getDataTypeByColumn(const IColumn & column)
{
auto idx = column.getDataType();
if (WhichDataType(idx).isSimple())
WhichDataType which(idx);
if (which.isSimple())
return DataTypeFactory::instance().get(String(magic_enum::enum_name(idx)));
if (WhichDataType(idx).isNothing())
if (which.isNothing())
return std::make_shared<DataTypeNothing>();
if (const auto * column_array = checkAndGetColumn<ColumnArray>(&column))
@ -132,41 +135,124 @@ static auto extractVector(const std::vector<Tuple> & vec)
return res;
}
void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_storage_columns)
static DataTypePtr recreateTupleWithElements(const DataTypeTuple & type_tuple, const DataTypes & elements)
{
std::unordered_map<String, DataTypePtr> storage_columns_map;
for (const auto & [name, type] : extended_storage_columns)
storage_columns_map[name] = type;
return type_tuple.haveExplicitNames()
? std::make_shared<DataTypeTuple>(elements, type_tuple.getElementNames())
: std::make_shared<DataTypeTuple>(elements);
}
static std::pair<ColumnPtr, DataTypePtr> convertObjectColumnToTuple(
const ColumnObject & column_object, const DataTypeObject & type_object)
{
if (!column_object.isFinalized())
{
auto finalized = column_object.cloneFinalized();
const auto & finalized_object = assert_cast<const ColumnObject &>(*finalized);
return convertObjectColumnToTuple(finalized_object, type_object);
}
const auto & subcolumns = column_object.getSubcolumns();
PathsInData tuple_paths;
DataTypes tuple_types;
Columns tuple_columns;
for (const auto & entry : subcolumns)
{
tuple_paths.emplace_back(entry->path);
tuple_types.emplace_back(entry->data.getLeastCommonType());
tuple_columns.emplace_back(entry->data.getFinalizedColumnPtr());
}
return unflattenTuple(tuple_paths, tuple_types, tuple_columns);
}
static std::pair<ColumnPtr, DataTypePtr> recursivlyConvertDynamicColumnToTuple(
const ColumnPtr & column, const DataTypePtr & type)
{
if (!type->hasDynamicSubcolumns())
return {column, type};
if (const auto * type_object = typeid_cast<const DataTypeObject *>(type.get()))
{
const auto & column_object = assert_cast<const ColumnObject &>(*column);
return convertObjectColumnToTuple(column_object, *type_object);
}
if (const auto * type_array = typeid_cast<const DataTypeArray *>(type.get()))
{
const auto & column_array = assert_cast<const ColumnArray &>(*column);
auto [new_column, new_type] = recursivlyConvertDynamicColumnToTuple(
column_array.getDataPtr(), type_array->getNestedType());
return
{
ColumnArray::create(new_column, column_array.getOffsetsPtr()),
std::make_shared<DataTypeArray>(std::move(new_type)),
};
}
if (const auto * type_map = typeid_cast<const DataTypeMap *>(type.get()))
{
const auto & column_map = assert_cast<const ColumnMap &>(*column);
auto [new_column, new_type] = recursivlyConvertDynamicColumnToTuple(
column_map.getNestedColumnPtr(), type_map->getNestedType());
return
{
ColumnMap::create(new_column),
std::make_shared<DataTypeMap>(std::move(new_type)),
};
}
if (const auto * type_tuple = typeid_cast<const DataTypeTuple *>(type.get()))
{
const auto & tuple_columns = assert_cast<const ColumnTuple &>(*column).getColumns();
const auto & tuple_types = type_tuple->getElements();
assert(tuple_columns.size() == tuple_types.size());
const size_t tuple_size = tuple_types.size();
Columns new_tuple_columns(tuple_size);
DataTypes new_tuple_types(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
{
std::tie(new_tuple_columns[i], new_tuple_types[i])
= recursivlyConvertDynamicColumnToTuple(tuple_columns[i], tuple_types[i]);
}
return
{
ColumnTuple::create(new_tuple_columns),
recreateTupleWithElements(*type_tuple, new_tuple_types)
};
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type {} unexpectedly has dynamic columns", type->getName());
}
void convertDynamicColumnsToTuples(Block & block, const StorageSnapshotPtr & storage_snapshot)
{
for (auto & column : block)
{
if (!isObject(column.type))
if (!column.type->hasDynamicSubcolumns())
continue;
const auto & column_object = assert_cast<const ColumnObject &>(*column.column);
if (!column_object.isFinalized())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot convert to tuple column '{}' from type {}. Column should be finalized first",
column.name, column.type->getName());
std::tie(column.column, column.type)
= recursivlyConvertDynamicColumnToTuple(column.column, column.type);
std::tie(column.column, column.type) = unflattenObjectToTuple(column_object);
auto it = storage_columns_map.find(column.name);
if (it == storage_columns_map.end())
GetColumnsOptions options(GetColumnsOptions::AllPhysical);
auto storage_column = storage_snapshot->tryGetColumn(options, column.name);
if (!storage_column)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column '{}' not found in storage", column.name);
/// Check that constructed Tuple type and type in storage are compatible.
getLeastCommonTypeForObject({column.type, it->second}, true);
}
}
auto storage_column_concrete = storage_snapshot->getColumn(options.withExtendedObjects(), column.name);
void deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block)
{
if (!storage_snapshot->object_columns.empty())
{
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto storage_columns = storage_snapshot->getColumns(options);
convertObjectsToTuples(block, storage_columns);
/// Check that constructed Tuple type and type in storage are compatible.
getLeastCommonTypeForDynamicColumns(
storage_column->type, {column.type, storage_column_concrete.type}, true);
}
}
@ -217,24 +303,8 @@ void checkObjectHasNoAmbiguosPaths(const PathsInData & paths)
}
}
DataTypePtr getLeastCommonTypeForObject(const DataTypes & types, bool check_ambiguos_paths)
static DataTypePtr getLeastCommonTypeForObject(const DataTypes & types, bool check_ambiguos_paths)
{
if (types.empty())
return nullptr;
bool all_equal = true;
for (size_t i = 1; i < types.size(); ++i)
{
if (!types[i]->equals(*types[0]))
{
all_equal = false;
break;
}
}
if (all_equal)
return types[0];
/// Types of subcolumns by path from all tuples.
std::unordered_map<PathInData, DataTypes, PathInData::Hash> subcolumns_types;
@ -287,19 +357,139 @@ DataTypePtr getLeastCommonTypeForObject(const DataTypes & types, bool check_ambi
return unflattenTuple(tuple_paths, tuple_types);
}
NameSet getNamesOfObjectColumns(const NamesAndTypesList & columns_list)
{
NameSet res;
for (const auto & [name, type] : columns_list)
if (isObject(type))
res.insert(name);
static DataTypePtr getLeastCommonTypeForDynamicColumnsImpl(
const DataTypePtr & type_in_storage, const DataTypes & concrete_types, bool check_ambiguos_paths);
return res;
template<typename Type>
static DataTypePtr getLeastCommonTypeForColumnWithNestedType(
const Type & type, const DataTypes & concrete_types, bool check_ambiguos_paths)
{
DataTypes nested_types;
nested_types.reserve(concrete_types.size());
for (const auto & concrete_type : concrete_types)
{
const auto * type_with_nested_conctete = typeid_cast<const Type *>(concrete_type.get());
if (!type_with_nested_conctete)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Expected {} type, got {}", demangle(typeid(Type).name()), concrete_type->getName());
nested_types.push_back(type_with_nested_conctete->getNestedType());
}
bool hasObjectColumns(const ColumnsDescription & columns)
return std::make_shared<Type>(
getLeastCommonTypeForDynamicColumnsImpl(
type.getNestedType(), nested_types, check_ambiguos_paths));
}
static DataTypePtr getLeastCommonTypeForTuple(
const DataTypeTuple & type, const DataTypes & concrete_types, bool check_ambiguos_paths)
{
return std::any_of(columns.begin(), columns.end(), [](const auto & column) { return isObject(column.type); });
const auto & element_types = type.getElements();
DataTypes new_element_types(element_types.size());
for (size_t i = 0; i < element_types.size(); ++i)
{
DataTypes concrete_element_types;
concrete_element_types.reserve(concrete_types.size());
for (const auto & type_concrete : concrete_types)
{
const auto * type_tuple_conctete = typeid_cast<const DataTypeTuple *>(type_concrete.get());
if (!type_tuple_conctete)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Expected Tuple type, got {}", type_concrete->getName());
concrete_element_types.push_back(type_tuple_conctete->getElement(i));
}
new_element_types[i] = getLeastCommonTypeForDynamicColumnsImpl(
element_types[i], concrete_element_types, check_ambiguos_paths);
}
return recreateTupleWithElements(type, new_element_types);
}
static DataTypePtr getLeastCommonTypeForDynamicColumnsImpl(
const DataTypePtr & type_in_storage, const DataTypes & concrete_types, bool check_ambiguos_paths)
{
if (!type_in_storage->hasDynamicSubcolumns())
return type_in_storage;
if (isObject(type_in_storage))
return getLeastCommonTypeForObject(concrete_types, check_ambiguos_paths);
if (const auto * type_array = typeid_cast<const DataTypeArray *>(type_in_storage.get()))
return getLeastCommonTypeForColumnWithNestedType(*type_array, concrete_types, check_ambiguos_paths);
if (const auto * type_map = typeid_cast<const DataTypeMap *>(type_in_storage.get()))
return getLeastCommonTypeForColumnWithNestedType(*type_map, concrete_types, check_ambiguos_paths);
if (const auto * type_tuple = typeid_cast<const DataTypeTuple *>(type_in_storage.get()))
return getLeastCommonTypeForTuple(*type_tuple, concrete_types, check_ambiguos_paths);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type {} unexpectedly has dynamic columns", type_in_storage->getName());
}
DataTypePtr getLeastCommonTypeForDynamicColumns(
const DataTypePtr & type_in_storage, const DataTypes & concrete_types, bool check_ambiguos_paths)
{
if (concrete_types.empty())
return nullptr;
bool all_equal = true;
for (size_t i = 1; i < concrete_types.size(); ++i)
{
if (!concrete_types[i]->equals(*concrete_types[0]))
{
all_equal = false;
break;
}
}
if (all_equal)
return concrete_types[0];
return getLeastCommonTypeForDynamicColumnsImpl(type_in_storage, concrete_types, check_ambiguos_paths);
}
DataTypePtr createConcreteEmptyDynamicColumn(const DataTypePtr & type_in_storage)
{
if (!type_in_storage->hasDynamicSubcolumns())
return type_in_storage;
if (isObject(type_in_storage))
return std::make_shared<DataTypeTuple>(
DataTypes{std::make_shared<DataTypeUInt8>()}, Names{ColumnObject::COLUMN_NAME_DUMMY});
if (const auto * type_array = typeid_cast<const DataTypeArray *>(type_in_storage.get()))
return std::make_shared<DataTypeArray>(
createConcreteEmptyDynamicColumn(type_array->getNestedType()));
if (const auto * type_map = typeid_cast<const DataTypeMap *>(type_in_storage.get()))
return std::make_shared<DataTypeMap>(
createConcreteEmptyDynamicColumn(type_map->getNestedType()));
if (const auto * type_tuple = typeid_cast<const DataTypeTuple *>(type_in_storage.get()))
{
const auto & elements = type_tuple->getElements();
DataTypes new_elements;
new_elements.reserve(elements.size());
for (const auto & element : elements)
new_elements.push_back(createConcreteEmptyDynamicColumn(element));
return recreateTupleWithElements(*type_tuple, new_elements);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type {} unexpectedly has dynamic columns", type_in_storage->getName());
}
bool hasDynamicSubcolumns(const ColumnsDescription & columns)
{
return std::any_of(columns.begin(), columns.end(),
[](const auto & column)
{
return column.type->hasDynamicSubcolumns();
});
}
void extendObjectColumns(NamesAndTypesList & columns_list, const ColumnsDescription & object_columns, bool with_subcolumns)
@ -320,16 +510,20 @@ void extendObjectColumns(NamesAndTypesList & columns_list, const ColumnsDescript
columns_list.splice(columns_list.end(), std::move(subcolumns_list));
}
void updateObjectColumns(ColumnsDescription & object_columns, const NamesAndTypesList & new_columns)
void updateObjectColumns(
ColumnsDescription & object_columns,
const ColumnsDescription & storage_columns,
const NamesAndTypesList & new_columns)
{
for (const auto & new_column : new_columns)
{
auto object_column = object_columns.tryGetColumn(GetColumnsOptions::All, new_column.name);
if (object_column && !object_column->type->equals(*new_column.type))
{
auto storage_column = storage_columns.getColumn(GetColumnsOptions::All, new_column.name);
object_columns.modify(new_column.name, [&](auto & column)
{
column.type = getLeastCommonTypeForObject({object_column->type, new_column.type});
column.type = getLeastCommonTypeForDynamicColumns(storage_column.type, {object_column->type, new_column.type});
});
}
}
@ -745,13 +939,6 @@ void replaceMissedSubcolumnsByConstants(
addConstantToWithClause(query, name, type);
}
void finalizeObjectColumns(const MutableColumns & columns)
{
for (const auto & column : columns)
if (auto * column_object = typeid_cast<ColumnObject *>(column.get()))
column_object->finalize();
}
Field FieldVisitorReplaceScalars::operator()(const Array & x) const
{
if (num_dimensions_to_keep == 0)
@ -768,11 +955,13 @@ size_t FieldVisitorToNumberOfDimensions::operator()(const Array & x)
{
const size_t size = x.size();
size_t dimensions = 0;
for (size_t i = 0; i < size; ++i)
{
size_t element_dimensions = applyVisitor(*this, x[i]);
if (i > 0 && element_dimensions != dimensions)
need_fold_dimension = true;
dimensions = std::max(dimensions, element_dimensions);
}
@ -783,12 +972,13 @@ Field FieldVisitorFoldDimension::operator()(const Array & x) const
{
if (num_dimensions_to_fold == 0)
return x;
const size_t size = x.size();
Array res(size);
for (size_t i = 0; i < size; ++i)
{
res[i] = applyVisitor(FieldVisitorFoldDimension(num_dimensions_to_fold - 1), x[i]);
}
return res;
}
}

View File

@ -39,27 +39,31 @@ Array createEmptyArrayField(size_t num_dimensions);
DataTypePtr getDataTypeByColumn(const IColumn & column);
/// Converts Object types and columns to Tuples in @columns_list and @block
/// and checks that types are consistent with types in @extended_storage_columns.
void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_storage_columns);
void deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block);
/// and checks that types are consistent with types in @storage_snapshot.
void convertDynamicColumnsToTuples(Block & block, const StorageSnapshotPtr & storage_snapshot);
/// Checks that each path is not the prefix of any other path.
void checkObjectHasNoAmbiguosPaths(const PathsInData & paths);
/// Receives several Tuple types and deduces the least common type among them.
DataTypePtr getLeastCommonTypeForObject(const DataTypes & types, bool check_ambiguos_paths = false);
DataTypePtr getLeastCommonTypeForDynamicColumns(
const DataTypePtr & type_in_storage, const DataTypes & types, bool check_ambiguos_paths = false);
DataTypePtr createConcreteEmptyDynamicColumn(const DataTypePtr & type_in_storage);
/// Converts types of object columns to tuples in @columns_list
/// according to @object_columns and adds all tuple's subcolumns if needed.
void extendObjectColumns(NamesAndTypesList & columns_list, const ColumnsDescription & object_columns, bool with_subcolumns);
NameSet getNamesOfObjectColumns(const NamesAndTypesList & columns_list);
bool hasObjectColumns(const ColumnsDescription & columns);
void finalizeObjectColumns(const MutableColumns & columns);
/// Checks whether @columns contain any column with dynamic subcolumns.
bool hasDynamicSubcolumns(const ColumnsDescription & columns);
/// Updates types of objects in @object_columns inplace
/// according to types in new_columns.
void updateObjectColumns(ColumnsDescription & object_columns, const NamesAndTypesList & new_columns);
void updateObjectColumns(
ColumnsDescription & object_columns,
const ColumnsDescription & storage_columns,
const NamesAndTypesList & new_columns);
using DataTypeTuplePtr = std::shared_ptr<DataTypeTuple>;
@ -142,6 +146,7 @@ public:
{
if (num_dimensions_to_fold == 0)
return x;
Array res(1, x);
for (size_t i = 1; i < num_dimensions_to_fold; ++i)
{
@ -149,6 +154,7 @@ public:
new_res.push_back(std::move(res));
res = std::move(new_res);
}
return res;
}
@ -163,7 +169,7 @@ private:
/// columns-like objects from entry to which Iterator points.
/// columns-like object should have fields "name" and "type".
template <typename Iterator, typename EntryColumnsGetter>
ColumnsDescription getObjectColumns(
ColumnsDescription getConcreteObjectColumns(
Iterator begin, Iterator end,
const ColumnsDescription & storage_columns,
EntryColumnsGetter && entry_columns_getter)
@ -176,14 +182,8 @@ ColumnsDescription getObjectColumns(
/// dummy column will be removed.
for (const auto & column : storage_columns)
{
if (isObject(column.type))
{
auto tuple_type = std::make_shared<DataTypeTuple>(
DataTypes{std::make_shared<DataTypeUInt8>()},
Names{ColumnObject::COLUMN_NAME_DUMMY});
types_in_entries[column.name].push_back(std::move(tuple_type));
}
if (column.type->hasDynamicSubcolumns())
types_in_entries[column.name].push_back(createConcreteEmptyDynamicColumn(column.type));
}
for (auto it = begin; it != end; ++it)
@ -192,14 +192,17 @@ ColumnsDescription getObjectColumns(
for (const auto & column : entry_columns)
{
auto storage_column = storage_columns.tryGetPhysical(column.name);
if (storage_column && isObject(storage_column->type))
if (storage_column && storage_column->type->hasDynamicSubcolumns())
types_in_entries[column.name].push_back(column.type);
}
}
ColumnsDescription res;
for (const auto & [name, types] : types_in_entries)
res.add({name, getLeastCommonTypeForObject(types)});
{
auto storage_column = storage_columns.getPhysical(name);
res.add({name, getLeastCommonTypeForDynamicColumns(storage_column.type, types)});
}
return res;
}

View File

@ -249,7 +249,9 @@ public:
};
/// Call before serializeBinaryBulkWithMultipleStreams chain to write something before first mark.
/// Column may be used only to retrieve the structure.
virtual void serializeBinaryBulkStatePrefix(
const IColumn & /*column*/,
SerializeBinaryBulkSettings & /*settings*/,
SerializeBinaryBulkStatePtr & /*state*/) const {}

View File

@ -246,11 +246,13 @@ void SerializationArray::enumerateStreams(
}
void SerializationArray::serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::ArrayElements);
nested->serializeBinaryBulkStatePrefix(settings, state);
const auto & column_array = assert_cast<const ColumnArray &>(column);
nested->serializeBinaryBulkStatePrefix(column_array.getData(), settings, state);
settings.path.pop_back();
}

View File

@ -41,6 +41,7 @@ public:
const SubstreamData & data) const override;
void serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;

View File

@ -221,6 +221,7 @@ struct DeserializeStateLowCardinality : public ISerialization::DeserializeBinary
};
void SerializationLowCardinality::serializeBinaryBulkStatePrefix(
const IColumn & /*column*/,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{

View File

@ -23,6 +23,7 @@ public:
const SubstreamData & data) const override;
void serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;

View File

@ -270,10 +270,11 @@ void SerializationMap::enumerateStreams(
}
void SerializationMap::serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
nested->serializeBinaryBulkStatePrefix(settings, state);
nested->serializeBinaryBulkStatePrefix(extractNestedColumn(column), settings, state);
}
void SerializationMap::serializeBinaryBulkStateSuffix(

View File

@ -37,6 +37,7 @@ public:
const SubstreamData & data) const override;
void serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;

View File

@ -17,11 +17,12 @@ void SerializationNamed::enumerateStreams(
}
void SerializationNamed::serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
addToPath(settings.path);
nested_serialization->serializeBinaryBulkStatePrefix(settings, state);
nested_serialization->serializeBinaryBulkStatePrefix(column, settings, state);
settings.path.pop_back();
}

View File

@ -31,6 +31,7 @@ public:
const SubstreamData & data) const override;
void serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;

View File

@ -70,11 +70,13 @@ void SerializationNullable::enumerateStreams(
}
void SerializationNullable::serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::NullableElements);
nested->serializeBinaryBulkStatePrefix(settings, state);
const auto & column_nullable = assert_cast<const ColumnNullable &>(column);
nested->serializeBinaryBulkStatePrefix(column_nullable.getNestedColumn(), settings, state);
settings.path.pop_back();
}

View File

@ -19,6 +19,7 @@ public:
const SubstreamData & data) const override;
void serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;

View File

@ -13,8 +13,6 @@
#include <Columns/ColumnString.h>
#include <Functions/FunctionsConversion.h>
#include <Common/FieldVisitorToString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/VarInt.h>
@ -30,6 +28,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
extern const int INCORRECT_DATA;
extern const int CANNOT_READ_ALL_DATA;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
@ -141,7 +140,6 @@ void SerializationObject<Parser>::checkSerializationIsSupported(const TSettings
template <typename Parser>
struct SerializationObject<Parser>::SerializeStateObject : public ISerialization::SerializeBinaryBulkState
{
bool is_first = true;
DataTypePtr nested_type;
SerializationPtr nested_serialization;
SerializeBinaryBulkStatePtr nested_state;
@ -158,6 +156,7 @@ struct SerializationObject<Parser>::DeserializeStateObject : public ISerializati
template <typename Parser>
void SerializationObject<Parser>::serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
@ -166,15 +165,34 @@ void SerializationObject<Parser>::serializeBinaryBulkStatePrefix(
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"DataTypeObject doesn't support serialization with non-trivial state");
const auto & column_object = assert_cast<const ColumnObject &>(column);
if (!column_object.isFinalized())
{
auto finalized = column_object.cloneFinalized();
serializeBinaryBulkStatePrefix(*finalized, settings, state);
return;
}
settings.path.push_back(Substream::ObjectStructure);
auto * stream = settings.getter(settings.path);
settings.path.pop_back();
if (!stream)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for kind of binary serialization");
auto [tuple_column, tuple_type] = unflattenObjectToTuple(column_object);
writeIntBinary(static_cast<UInt8>(BinarySerializationKind::TUPLE), *stream);
state = std::make_shared<SerializeStateObject>();
writeStringBinary(tuple_type->getName(), *stream);
auto state_object = std::make_shared<SerializeStateObject>();
state_object->nested_type = tuple_type;
state_object->nested_serialization = tuple_type->getDefaultSerialization();
settings.path.back() = Substream::ObjectData;
state_object->nested_serialization->serializeBinaryBulkStatePrefix(*tuple_column, settings, state_object->nested_state);
state = std::move(state_object);
settings.path.pop_back();
}
template <typename Parser>
@ -261,33 +279,14 @@ void SerializationObject<Parser>::serializeBinaryBulkWithMultipleStreams(
if (!column_object.isFinalized())
{
auto finalized_object = column_object.clone();
assert_cast<ColumnObject &>(*finalized_object).finalize();
serializeBinaryBulkWithMultipleStreams(*finalized_object, offset, limit, settings, state);
auto finalized = column_object.cloneFinalized();
serializeBinaryBulkWithMultipleStreams(*finalized, offset, limit, settings, state);
return;
}
auto [tuple_column, tuple_type] = unflattenObjectToTuple(column_object);
if (state_object->is_first)
{
/// Actually it's a part of serializeBinaryBulkStatePrefix,
/// but it cannot be done there, because we have to know the
/// structure of column.
settings.path.push_back(Substream::ObjectStructure);
if (auto * stream = settings.getter(settings.path))
writeStringBinary(tuple_type->getName(), *stream);
state_object->nested_type = tuple_type;
state_object->nested_serialization = tuple_type->getDefaultSerialization();
state_object->is_first = false;
settings.path.back() = Substream::ObjectData;
state_object->nested_serialization->serializeBinaryBulkStatePrefix(settings, state_object->nested_state);
settings.path.pop_back();
}
else if (!state_object->nested_type->equals(*tuple_type))
if (!state_object->nested_type->equals(*tuple_type))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Types of internal column of Object mismatched. Expected: {}, Got: {}",
@ -411,18 +410,63 @@ void SerializationObject<Parser>::serializeTextImpl(const IColumn & column, size
writeChar('{', ostr);
for (auto it = subcolumns.begin(); it != subcolumns.end(); ++it)
{
const auto & entry = *it;
if (it != subcolumns.begin())
writeCString(",", ostr);
writeDoubleQuoted((*it)->path.getPath(), ostr);
writeDoubleQuoted(entry->path.getPath(), ostr);
writeChar(':', ostr);
auto serialization = (*it)->data.getLeastCommonType()->getDefaultSerialization();
serialization->serializeTextJSON((*it)->data.getFinalizedColumn(), row_num, ostr, settings);
serializeTextFromSubcolumn(entry->data, row_num, ostr, settings);
}
writeChar('}', ostr);
}
template <typename Parser>
void SerializationObject<Parser>::serializeTextFromSubcolumn(
const ColumnObject::Subcolumn & subcolumn, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
const auto & least_common_type = subcolumn.getLeastCommonType();
if (subcolumn.isFinalized())
{
const auto & finalized_column = subcolumn.getFinalizedColumn();
auto info = least_common_type->getSerializationInfo(finalized_column);
auto serialization = least_common_type->getSerialization(*info);
serialization->serializeTextJSON(finalized_column, row_num, ostr, settings);
return;
}
size_t ind = row_num;
if (ind < subcolumn.getNumberOfDefaultsInPrefix())
{
/// Suboptimal, but it should happen rarely.
auto tmp_column = subcolumn.getLeastCommonType()->createColumn();
tmp_column->insertDefault();
auto info = least_common_type->getSerializationInfo(*tmp_column);
auto serialization = least_common_type->getSerialization(*info);
serialization->serializeTextJSON(*tmp_column, 0, ostr, settings);
return;
}
ind -= subcolumn.getNumberOfDefaultsInPrefix();
for (const auto & part : subcolumn.getData())
{
if (ind < part->size())
{
auto part_type = getDataTypeByColumn(*part);
auto info = part_type->getSerializationInfo(*part);
auto serialization = part_type->getSerialization(*info);
serialization->serializeTextJSON(*part, ind, ostr, settings);
return;
}
ind -= part->size();
}
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Index ({}) for text serialization is out of range", row_num);
}
template <typename Parser>
void SerializationObject<Parser>::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{

View File

@ -8,7 +8,7 @@ namespace DB
{
/** Serialization for data type Object.
* Supported only test serialization/deserialization.
* Supported only text serialization/deserialization.
* and binary bulk serialization/deserialization without position independent
* encoding, i.e. serialization/deserialization into Native format.
*/
@ -31,6 +31,7 @@ public:
*/
void serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
@ -104,6 +105,7 @@ private:
void deserializeTextImpl(IColumn & column, Reader && reader) const;
void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const;
void serializeTextFromSubcolumn(const ColumnObject::Subcolumn & subcolumn, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const;
/// Pool of parser objects to make SerializationObject thread safe.
mutable SimpleObjectPool<Parser> parsers_pool;

View File

@ -178,11 +178,16 @@ void SerializationSparse::enumerateStreams(
}
void SerializationSparse::serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::SparseElements);
nested->serializeBinaryBulkStatePrefix(settings, state);
if (const auto * column_sparse = typeid_cast<const ColumnSparse *>(&column))
nested->serializeBinaryBulkStatePrefix(column_sparse->getValuesColumn(), settings, state);
else
nested->serializeBinaryBulkStatePrefix(column, settings, state);
settings.path.pop_back();
}

View File

@ -33,6 +33,7 @@ public:
const SubstreamData & data) const override;
void serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;

View File

@ -314,6 +314,7 @@ struct DeserializeBinaryBulkStateTuple : public ISerialization::DeserializeBinar
void SerializationTuple::serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
@ -321,7 +322,7 @@ void SerializationTuple::serializeBinaryBulkStatePrefix(
tuple_state->states.resize(elems.size());
for (size_t i = 0; i < elems.size(); ++i)
elems[i]->serializeBinaryBulkStatePrefix(settings, tuple_state->states[i]);
elems[i]->serializeBinaryBulkStatePrefix(extractElementColumn(column, i), settings, tuple_state->states[i]);
state = std::move(tuple_state);
}

View File

@ -39,6 +39,7 @@ public:
const SubstreamData & data) const override;
void serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;

View File

@ -13,10 +13,11 @@ void SerializationWrapper::enumerateStreams(
}
void SerializationWrapper::serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
nested_serialization->serializeBinaryBulkStatePrefix(settings, state);
nested_serialization->serializeBinaryBulkStatePrefix(column, settings, state);
}
void SerializationWrapper::serializeBinaryBulkStateSuffix(

View File

@ -26,6 +26,7 @@ public:
const SubstreamData & data) const override;
void serializeBinaryBulkStatePrefix(
const IColumn & column,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;

View File

@ -31,7 +31,7 @@ TEST(SerializationObject, FromString)
settings.getter = [&out](const auto &) { return &out; };
writeIntBinary(static_cast<UInt8>(1), out);
serialization->serializeBinaryBulkStatePrefix(settings, state);
serialization->serializeBinaryBulkStatePrefix(*column_string, settings, state);
serialization->serializeBinaryBulkWithMultipleStreams(*column_string, 0, column_string->size(), settings, state);
serialization->serializeBinaryBulkStateSuffix(settings, state);
}

View File

@ -141,7 +141,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), object.absolute_path);
}
void AzureObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
void AzureObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children) const
{
auto client_ptr = client.get();

View File

@ -84,7 +84,7 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
void findAllFiles(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const StoredObject & object) override;

View File

@ -282,9 +282,9 @@ std::unique_ptr<IObjectStorage> CachedObjectStorage::cloneObjectStorage(
return object_storage->cloneObjectStorage(new_namespace, config, config_prefix, context);
}
void CachedObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
void CachedObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children) const
{
object_storage->listPrefix(path, children);
object_storage->findAllFiles(path, children);
}
ObjectMetadata CachedObjectStorage::getObjectMetadata(const std::string & path) const

View File

@ -72,7 +72,7 @@ public:
const std::string & config_prefix,
ContextPtr context) override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
void findAllFiles(const std::string & path, RelativePathsWithSize & children) const override;
ObjectMetadata getObjectMetadata(const std::string & path) const override;

View File

@ -390,7 +390,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
};
RelativePathsWithSize children;
source_object_storage->listPrefix(restore_information.source_path, children);
source_object_storage->findAllFiles(restore_information.source_path, children);
restore_files(children);
@ -540,7 +540,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
};
RelativePathsWithSize children;
source_object_storage->listPrefix(restore_information.source_path + "operations/", children);
source_object_storage->findAllFiles(restore_information.source_path + "operations/", children);
restore_file_operations(children);
if (restore_information.detached)

View File

@ -101,18 +101,6 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
}
void HDFSObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
const size_t begin_of_path = path.find('/', path.find("//") + 2);
int32_t num_entries;
auto * files_list = hdfsListDirectory(hdfs_fs.get(), path.substr(begin_of_path).c_str(), &num_entries);
if (num_entries == -1)
throw Exception(ErrorCodes::HDFS_ERROR, "HDFSDelete failed with path: " + path);
for (int32_t i = 0; i < num_entries; ++i)
children.emplace_back(files_list[i].mName, files_list[i].mSize);
}
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void HDFSObjectStorage::removeObject(const StoredObject & object)
{

View File

@ -85,8 +85,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const StoredObject & object) override;

View File

@ -11,10 +11,16 @@
#include <Disks/DirectoryIterator.h>
#include <Disks/WriteMode.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/ErrorCodes.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
class IMetadataStorage;
/// Tries to provide some "transactions" interface, which allow
@ -33,32 +39,71 @@ public:
/// General purpose methods
/// Write metadata string to file
virtual void writeStringToFile(const std::string & path, const std::string & data) = 0;
virtual void writeStringToFile(const std::string & /* path */, const std::string & /* data */)
{
throwNotImplemented();
}
virtual void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) = 0;
virtual void setLastModified(const std::string & /* path */, const Poco::Timestamp & /* timestamp */)
{
throwNotImplemented();
}
virtual bool supportsChmod() const = 0;
virtual void chmod(const String & path, mode_t mode) = 0;
virtual void chmod(const String & /* path */, mode_t /* mode */)
{
throwNotImplemented();
}
virtual void setReadOnly(const std::string & path) = 0;
virtual void setReadOnly(const std::string & /* path */)
{
throwNotImplemented();
}
virtual void unlinkFile(const std::string & path) = 0;
virtual void unlinkFile(const std::string & /* path */)
{
throwNotImplemented();
}
virtual void createDirectory(const std::string & path) = 0;
virtual void createDirectory(const std::string & /* path */)
{
throwNotImplemented();
}
virtual void createDirectoryRecursive(const std::string & path) = 0;
virtual void createDirectoryRecursive(const std::string & /* path */)
{
throwNotImplemented();
}
virtual void removeDirectory(const std::string & path) = 0;
virtual void removeDirectory(const std::string & /* path */)
{
throwNotImplemented();
}
virtual void removeRecursive(const std::string & path) = 0;
virtual void removeRecursive(const std::string & /* path */)
{
throwNotImplemented();
}
virtual void createHardLink(const std::string & path_from, const std::string & path_to) = 0;
virtual void createHardLink(const std::string & /* path_from */, const std::string & /* path_to */)
{
throwNotImplemented();
}
virtual void moveFile(const std::string & path_from, const std::string & path_to) = 0;
virtual void moveFile(const std::string & /* path_from */, const std::string & /* path_to */)
{
throwNotImplemented();
}
virtual void moveDirectory(const std::string & path_from, const std::string & path_to) = 0;
virtual void moveDirectory(const std::string & /* path_from */, const std::string & /* path_to */)
{
throwNotImplemented();
}
virtual void replaceFile(const std::string & path_from, const std::string & path_to) = 0;
virtual void replaceFile(const std::string & /* path_from */, const std::string & /* path_to */)
{
throwNotImplemented();
}
/// Metadata related methods
@ -69,7 +114,10 @@ public:
virtual void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
/// Add to new blob to metadata file (way to implement appends)
virtual void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
virtual void addBlobToMetadata(const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */)
{
throwNotImplemented();
}
/// Unlink metadata file and do something special if required
/// By default just remove file (unlink file).
@ -79,6 +127,12 @@ public:
}
virtual ~IMetadataTransaction() = default;
private:
[[noreturn]] static void throwNotImplemented()
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation is not implemented");
}
};
using MetadataTransactionPtr = std::shared_ptr<IMetadataTransaction>;
@ -106,12 +160,18 @@ public:
virtual Poco::Timestamp getLastModified(const std::string & path) const = 0;
virtual time_t getLastChanged(const std::string & path) const = 0;
virtual time_t getLastChanged(const std::string & /* path */) const
{
throwNotImplemented();
}
virtual bool supportsChmod() const = 0;
virtual bool supportsStat() const = 0;
virtual struct stat stat(const String & path) const = 0;
virtual struct stat stat(const String & /* path */) const
{
throwNotImplemented();
}
virtual std::vector<std::string> listDirectory(const std::string & path) const = 0;
@ -120,20 +180,32 @@ public:
virtual uint32_t getHardlinkCount(const std::string & path) const = 0;
/// Read metadata file to string from path
virtual std::string readFileToString(const std::string & path) const = 0;
virtual std::string readFileToString(const std::string & /* path */) const
{
throwNotImplemented();
}
virtual ~IMetadataStorage() = default;
/// ==== More specific methods. Previous were almost general purpose. ====
/// Read multiple metadata files into strings and return mapping from file_path -> metadata
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & file_paths) const = 0;
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & /* file_paths */) const
{
throwNotImplemented();
}
/// Return object information (absolute_path, bytes_size, ...) for metadata path.
/// object_storage_path is absolute.
virtual StoredObjects getStorageObjects(const std::string & path) const = 0;
virtual std::string getObjectStorageRootPath() const = 0;
private:
[[noreturn]] static void throwNotImplemented()
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation is not implemented");
}
};
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;

View File

@ -14,6 +14,17 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
void IObjectStorage::findAllFiles(const std::string &, RelativePathsWithSize &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "findAllFiles() is not supported");
}
void IObjectStorage::getDirectoryContents(const std::string &,
RelativePathsWithSize &,
std::vector<std::string> &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getDirectoryContents() is not supported");
}
IAsynchronousReader & IObjectStorage::getThreadPoolReader()
{
auto context = Context::getGlobalContextInstance();

View File

@ -65,8 +65,32 @@ public:
/// Object exists or not
virtual bool exists(const StoredObject & object) const = 0;
/// List on prefix, return children (relative paths) with their sizes.
virtual void listPrefix(const std::string & path, RelativePathsWithSize & children) const = 0;
/// List all objects with specific prefix.
///
/// For example if you do this over filesystem, you should skip folders and
/// return files only, so something like on local filesystem:
///
/// find . -type f
///
/// @param children - out files (relative paths) with their sizes.
///
/// NOTE: It makes sense only for real object storages (S3, Azure), since
/// it is used only for one of the following:
/// - send_metadata (to restore metadata)
/// - see DiskObjectStorage::restoreMetadataIfNeeded()
/// - MetadataStorageFromPlainObjectStorage - only for s3_plain disk
virtual void findAllFiles(const std::string & path, RelativePathsWithSize & children) const;
/// Analog of directory content for object storage (object storage does not
/// have "directory" definition, but it can be emulated with usage of
/// "delimiter"), so this is analog of:
///
/// find . -maxdepth 1 $path
///
/// Return files in @files and directories in @directories
virtual void getDirectoryContents(const std::string & path,
RelativePathsWithSize & files,
std::vector<std::string> & directories) const;
/// Get object metadata if supported. It should be possible to receive
/// at least size of object

View File

@ -104,13 +104,6 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
return std::make_unique<WriteBufferFromFile>(path, buf_size, flags);
}
void LocalObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
fs::directory_iterator end_it;
for (auto it = fs::directory_iterator(path); it != end_it; ++it)
children.emplace_back(it->path().filename(), it->file_size());
}
void LocalObjectStorage::removeObject(const StoredObject & object)
{
/// For local object storage files are actually removed when "metadata" is removed.

View File

@ -45,8 +45,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;

View File

@ -12,7 +12,6 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
@ -33,194 +32,102 @@ const std::string & MetadataStorageFromPlainObjectStorage::getPath() const
{
return object_storage_root_path;
}
std::filesystem::path MetadataStorageFromPlainObjectStorage::getAbsolutePath(const std::string & path) const
{
return fs::path(object_storage_root_path) / path;
}
bool MetadataStorageFromPlainObjectStorage::exists(const std::string & path) const
{
auto object = StoredObject::create(*object_storage, fs::path(object_storage_root_path) / path);
auto object = StoredObject::create(*object_storage, getAbsolutePath(path));
return object_storage->exists(object);
}
bool MetadataStorageFromPlainObjectStorage::isFile(const std::string & path) const
{
/// NOTE: This check is inaccurate and has excessive API calls
return !isDirectory(path) && exists(path);
return exists(path) && !isDirectory(path);
}
bool MetadataStorageFromPlainObjectStorage::isDirectory(const std::string & path) const
{
std::string directory = path;
std::string directory = getAbsolutePath(path);
trimRight(directory);
directory += "/";
/// NOTE: This check is far from ideal, since it work only if the directory
/// really has files, and has excessive API calls
RelativePathsWithSize children;
object_storage->listPrefix(directory, children);
return !children.empty();
}
Poco::Timestamp MetadataStorageFromPlainObjectStorage::getLastModified(const std::string &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getLastModified is not implemented for MetadataStorageFromPlainObjectStorage");
}
struct stat MetadataStorageFromPlainObjectStorage::stat(const std::string &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "stat is not implemented for MetadataStorageFromPlainObjectStorage");
}
time_t MetadataStorageFromPlainObjectStorage::getLastChanged(const std::string &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getLastChanged is not implemented for MetadataStorageFromPlainObjectStorage");
RelativePathsWithSize files;
std::vector<std::string> directories;
object_storage->getDirectoryContents(directory, files, directories);
return !files.empty() || !directories.empty();
}
uint64_t MetadataStorageFromPlainObjectStorage::getFileSize(const String & path) const
{
RelativePathsWithSize children;
object_storage->listPrefix(path, children);
object_storage->findAllFiles(getAbsolutePath(path), children);
if (children.empty())
return 0;
if (children.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "listPrefix() return multiple paths ({}) for {}", children.size(), path);
throw Exception(ErrorCodes::LOGICAL_ERROR, "findAllFiles() return multiple paths ({}) for {}", children.size(), path);
return children.front().bytes_size;
}
std::vector<std::string> MetadataStorageFromPlainObjectStorage::listDirectory(const std::string & path) const
{
RelativePathsWithSize children;
object_storage->listPrefix(path, children);
RelativePathsWithSize files;
std::vector<std::string> directories;
object_storage->getDirectoryContents(getAbsolutePath(path), files, directories);
std::vector<std::string> result;
for (const auto & path_size : children)
{
for (const auto & path_size : files)
result.push_back(path_size.relative_path);
}
for (const auto & directory : directories)
result.push_back(directory);
return result;
}
DirectoryIteratorPtr MetadataStorageFromPlainObjectStorage::iterateDirectory(const std::string & path) const
{
/// NOTE: this is not required for BACKUP/RESTORE, but this is a first step
/// towards MergeTree on plain S3.
/// Required for MergeTree
auto paths = listDirectory(path);
std::vector<std::filesystem::path> fs_paths(paths.begin(), paths.end());
return std::make_unique<StaticDirectoryIterator>(std::move(fs_paths));
}
std::string MetadataStorageFromPlainObjectStorage::readFileToString(const std::string &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "readFileToString is not implemented for MetadataStorageFromPlainObjectStorage");
}
std::unordered_map<String, String> MetadataStorageFromPlainObjectStorage::getSerializedMetadata(const std::vector<String> &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getSerializedMetadata is not implemented for MetadataStorageFromPlainObjectStorage");
}
StoredObjects MetadataStorageFromPlainObjectStorage::getStorageObjects(const std::string & path) const
{
std::string blob_name = object_storage->generateBlobNameForPath(path);
std::string object_path = fs::path(object_storage_root_path) / blob_name;
size_t object_size = getFileSize(object_path);
auto object = StoredObject::create(*object_storage, object_path, object_size, /* exists */true);
size_t object_size = getFileSize(blob_name);
auto object = StoredObject::create(*object_storage, getAbsolutePath(blob_name), object_size, /* exists */true);
return {std::move(object)};
}
uint32_t MetadataStorageFromPlainObjectStorage::getHardlinkCount(const std::string &) const
{
return 1;
}
const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getStorageForNonTransactionalReads() const
{
return metadata_storage;
}
void MetadataStorageFromPlainObjectStorageTransaction::writeStringToFile(const std::string &, const std::string & /* data */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "writeStringToFile is not implemented for MetadataStorageFromPlainObjectStorage");
}
void MetadataStorageFromPlainObjectStorageTransaction::setLastModified(const std::string &, const Poco::Timestamp &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "setLastModified is not implemented for MetadataStorageFromPlainObjectStorage");
}
void MetadataStorageFromPlainObjectStorageTransaction::unlinkFile(const std::string & path)
{
auto object = StoredObject::create(*metadata_storage.object_storage, fs::path(metadata_storage.object_storage_root_path) / path);
auto object = StoredObject::create(*metadata_storage.object_storage, metadata_storage.getAbsolutePath(path));
metadata_storage.object_storage->removeObject(object);
}
void MetadataStorageFromPlainObjectStorageTransaction::removeRecursive(const std::string &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "removeRecursive is not implemented for MetadataStorageFromPlainObjectStorage");
}
void MetadataStorageFromPlainObjectStorageTransaction::createDirectory(const std::string &)
{
/// Noop. It is an Object Storage not a filesystem.
}
void MetadataStorageFromPlainObjectStorageTransaction::createDirectoryRecursive(const std::string &)
{
/// Noop. It is an Object Storage not a filesystem.
}
void MetadataStorageFromPlainObjectStorageTransaction::removeDirectory(const std::string &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "removeDirectory is not implemented for MetadataStorageFromPlainObjectStorage");
}
void MetadataStorageFromPlainObjectStorageTransaction::moveFile(const std::string & /* path_from */, const std::string & /* path_to */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "moveFile is not implemented for MetadataStorageFromPlainObjectStorage");
}
void MetadataStorageFromPlainObjectStorageTransaction::moveDirectory(const std::string & /* path_from */, const std::string & /* path_to */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "moveDirectory is not implemented for MetadataStorageFromPlainObjectStorage");
}
void MetadataStorageFromPlainObjectStorageTransaction::replaceFile(const std::string & /* path_from */, const std::string & /* path_to */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "replaceFile is not implemented for MetadataStorageFromPlainObjectStorage");
}
void MetadataStorageFromPlainObjectStorageTransaction::chmod(const String &, mode_t)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "chmod is not implemented for MetadataStorageFromPlainObjectStorage");
}
void MetadataStorageFromPlainObjectStorageTransaction::setReadOnly(const std::string &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "setReadOnly is not implemented for MetadataStorageFromPlainObjectStorage");
}
void MetadataStorageFromPlainObjectStorageTransaction::createHardLink(const std::string & /* path_from */, const std::string & /* path_to */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "createHardLink is not implemented for MetadataStorageFromPlainObjectStorage");
}
void MetadataStorageFromPlainObjectStorageTransaction::createEmptyMetadataFile(const std::string &)
{
/// Noop, no separate metadata.
}
void MetadataStorageFromPlainObjectStorageTransaction::createMetadataFile(
const std::string &, const std::string & /* blob_name */, uint64_t /* size_in_bytes */)
{
/// Noop, no separate metadata.
}
void MetadataStorageFromPlainObjectStorageTransaction::addBlobToMetadata(
const std::string &, const std::string & /* blob_name */, uint64_t /* size_in_bytes */)
{
/// Noop, local metadata files is only one file, it is the metadata file itself.
}
void MetadataStorageFromPlainObjectStorageTransaction::unlinkMetadata(const std::string &)
{
/// Noop, no separate metadata.

View File

@ -45,31 +45,32 @@ public:
uint64_t getFileSize(const String & path) const override;
Poco::Timestamp getLastModified(const std::string & path) const override;
time_t getLastChanged(const std::string & path) const override;
bool supportsChmod() const override { return false; }
bool supportsStat() const override { return false; }
struct stat stat(const String & path) const override;
std::vector<std::string> listDirectory(const std::string & path) const override;
DirectoryIteratorPtr iterateDirectory(const std::string & path) const override;
std::string readFileToString(const std::string & path) const override;
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
uint32_t getHardlinkCount(const std::string & path) const override;
DiskPtr getDisk() const { return {}; }
StoredObjects getStorageObjects(const std::string & path) const override;
std::string getObjectStorageRootPath() const override { return object_storage_root_path; }
Poco::Timestamp getLastModified(const std::string & /* path */) const override
{
/// Required by MergeTree
return {};
}
uint32_t getHardlinkCount(const std::string & /* path */) const override
{
return 1;
}
bool supportsChmod() const override { return false; }
bool supportsStat() const override { return false; }
private:
std::filesystem::path getAbsolutePath(const std::string & path) const;
};
class MetadataStorageFromPlainObjectStorageTransaction final : public IMetadataTransaction
@ -83,47 +84,34 @@ public:
: metadata_storage(metadata_storage_)
{}
~MetadataStorageFromPlainObjectStorageTransaction() override = default;
const IMetadataStorage & getStorageForNonTransactionalReads() const final;
void commit() final {}
void writeStringToFile(const std::string & path, const std::string & data) override;
void createEmptyMetadataFile(const std::string & path) override;
void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
const IMetadataStorage & getStorageForNonTransactionalReads() const override;
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
void createEmptyMetadataFile(const std::string & /* path */) override
{
/// No metadata, no need to create anything.
}
bool supportsChmod() const override { return false; }
void chmod(const String & path, mode_t mode) override;
void setReadOnly(const std::string & path) override;
void unlinkFile(const std::string & path) override;
void createMetadataFile(const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */) override
{
/// Noop
}
void createDirectory(const std::string & path) override;
void createDirectoryRecursive(const std::string & path) override;
void removeDirectory(const std::string & path) override;
void removeRecursive(const std::string & path) override;
void createHardLink(const std::string & path_from, const std::string & path_to) override;
void moveFile(const std::string & path_from, const std::string & path_to) override;
void moveDirectory(const std::string & path_from, const std::string & path_to) override;
void replaceFile(const std::string & path_from, const std::string & path_to) override;
void unlinkFile(const std::string & path) override;
void unlinkMetadata(const std::string & path) override;
void commit() override
{
/// Nothing to commit.
}
bool supportsChmod() const override { return false; }
};
}

View File

@ -28,7 +28,7 @@
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <Common/getRandomASCIIString.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/logger_useful.h>
#include <Common/MultiVersion.h>
@ -248,7 +248,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
std::move(s3_buffer), std::move(finalize_callback), object.absolute_path);
}
void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
void S3ObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
@ -279,6 +279,49 @@ void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize
} while (outcome.GetResult().GetIsTruncated());
}
void S3ObjectStorage::getDirectoryContents(const std::string & path,
RelativePathsWithSize & files,
std::vector<std::string> & directories) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(bucket);
request.SetPrefix(path);
request.SetMaxKeys(settings_ptr->list_object_keys_size);
request.SetDelimiter("/");
Aws::S3::Model::ListObjectsV2Outcome outcome;
do
{
ProfileEvents::increment(ProfileEvents::S3ListObjects);
ProfileEvents::increment(ProfileEvents::DiskS3ListObjects);
outcome = client_ptr->ListObjectsV2(request);
throwIfError(outcome);
auto result = outcome.GetResult();
auto result_objects = result.GetContents();
auto result_common_prefixes = result.GetCommonPrefixes();
if (result_objects.empty() && result_common_prefixes.empty())
break;
for (const auto & object : result_objects)
files.emplace_back(object.GetKey(), object.GetSize());
for (const auto & common_prefix : result_common_prefixes)
{
std::string directory = common_prefix.GetPrefix();
/// Make it compatible with std::filesystem::path::filename()
trimRight(directory, '/');
directories.emplace_back(directory);
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
} while (outcome.GetResult().GetIsTruncated());
}
void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists)
{
auto client_ptr = client.get();

View File

@ -105,7 +105,10 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
void findAllFiles(const std::string & path, RelativePathsWithSize & children) const override;
void getDirectoryContents(const std::string & path,
RelativePathsWithSize & files,
std::vector<std::string> & directories) const override;
/// Uses `DeleteObjectRequest`.
void removeObject(const StoredObject & object) override;

View File

@ -12,7 +12,6 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int FILE_DOESNT_EXIST;
extern const int NETWORK_ERROR;
}
@ -168,91 +167,11 @@ DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(c
return std::make_unique<StaticDirectoryIterator>(std::move(dir_file_paths));
}
std::string MetadataStorageFromStaticFilesWebServer::readFileToString(const std::string &) const
{
WebObjectStorage::throwNotAllowed();
}
Poco::Timestamp MetadataStorageFromStaticFilesWebServer::getLastModified(const std::string &) const
{
return {};
}
time_t MetadataStorageFromStaticFilesWebServer::getLastChanged(const std::string &) const
{
return {};
}
uint32_t MetadataStorageFromStaticFilesWebServer::getHardlinkCount(const std::string &) const
{
return 1;
}
const IMetadataStorage & MetadataStorageFromStaticFilesWebServerTransaction::getStorageForNonTransactionalReads() const
{
return metadata_storage;
}
void MetadataStorageFromStaticFilesWebServerTransaction::writeStringToFile(const std::string &, const std::string &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::setLastModified(const std::string &, const Poco::Timestamp &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::unlinkFile(const std::string &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::removeRecursive(const std::string &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::removeDirectory(const std::string &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::moveFile(const std::string &, const std::string &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::moveDirectory(const std::string &, const std::string &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::replaceFile(const std::string &, const std::string &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::setReadOnly(const std::string &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::createHardLink(const std::string &, const std::string &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::addBlobToMetadata(const std::string &, const std::string &, uint64_t)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::unlinkMetadata(const std::string &)
{
WebObjectStorage::throwNotAllowed();
}
void MetadataStorageFromStaticFilesWebServerTransaction::createDirectory(const std::string &)
{
/// Noop.
@ -263,30 +182,4 @@ void MetadataStorageFromStaticFilesWebServerTransaction::createDirectoryRecursiv
/// Noop.
}
void MetadataStorageFromStaticFilesWebServerTransaction::createEmptyMetadataFile(const std::string & /* path */)
{
/// Noop.
}
void MetadataStorageFromStaticFilesWebServerTransaction::createMetadataFile(
const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */)
{
/// Noop.
}
void MetadataStorageFromStaticFilesWebServerTransaction::commit()
{
/// Noop.
}
std::unordered_map<String, String> MetadataStorageFromStaticFilesWebServer::getSerializedMetadata(const std::vector<String> &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getSerializedMetadata is not implemented for MetadataStorageFromStaticFilesWebServer");
}
void MetadataStorageFromStaticFilesWebServerTransaction::chmod(const String &, mode_t)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "chmod is not implemented for MetadataStorageFromStaticFilesWebServer");
}
}

View File

@ -36,29 +36,28 @@ public:
uint64_t getFileSize(const String & path) const override;
Poco::Timestamp getLastModified(const std::string & path) const override;
time_t getLastChanged(const std::string & path) const override;
std::vector<std::string> listDirectory(const std::string & path) const override;
DirectoryIteratorPtr iterateDirectory(const std::string & path) const override;
std::string readFileToString(const std::string & path) const override;
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
uint32_t getHardlinkCount(const std::string & path) const override;
StoredObjects getStorageObjects(const std::string & path) const override;
std::string getObjectStorageRootPath() const override { return ""; }
struct stat stat(const String & /* path */) const override { return {}; }
Poco::Timestamp getLastModified(const std::string & /* path */) const override
{
/// Required by MergeTree
return {};
}
uint32_t getHardlinkCount(const std::string & /* path */) const override
{
return 1;
}
bool supportsChmod() const override { return false; }
bool supportsStat() const override { return false; }
struct stat stat(const String &) const override { return {}; }
};
class MetadataStorageFromStaticFilesWebServerTransaction final : public IMetadataTransaction
@ -73,47 +72,28 @@ public:
: metadata_storage(metadata_storage_)
{}
~MetadataStorageFromStaticFilesWebServerTransaction() override = default;
const IMetadataStorage & getStorageForNonTransactionalReads() const override;
void commit() override;
void createEmptyMetadataFile(const std::string & /* path */) override
{
/// No metadata, no need to create anything.
}
void writeStringToFile(const std::string & path, const std::string & data) override;
void createEmptyMetadataFile(const std::string & path) override;
void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
void setReadOnly(const std::string & path) override;
void unlinkFile(const std::string & path) override;
void createMetadataFile(const std::string & /* path */, const std::string & /* blob_name */, uint64_t /* size_in_bytes */) override
{
/// Noop
}
void createDirectory(const std::string & path) override;
void createDirectoryRecursive(const std::string & path) override;
void removeDirectory(const std::string & path) override;
void removeRecursive(const std::string & path) override;
void createHardLink(const std::string & path_from, const std::string & path_to) override;
void moveFile(const std::string & path_from, const std::string & path_to) override;
void moveDirectory(const std::string & path_from, const std::string & path_to) override;
void replaceFile(const std::string & path_from, const std::string & path_to) override;
void unlinkMetadata(const std::string & path) override;
void commit() override
{
/// Nothing to commit.
}
bool supportsChmod() const override { return false; }
void chmod(const String &, mode_t) override;
};
}

View File

@ -178,17 +178,6 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
}
}
void WebObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
for (const auto & [file_path, file_info] : files)
{
if (file_info.type == FileType::File && file_path.starts_with(path))
{
children.emplace_back(file_path, file_info.size);
}
}
}
void WebObjectStorage::throwNotAllowed()
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only read-only operations are supported");

View File

@ -55,8 +55,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;

View File

@ -58,7 +58,7 @@ static void writeData(const ISerialization & serialization, const ColumnPtr & co
settings.low_cardinality_max_dictionary_size = 0; //-V1048
ISerialization::SerializeBinaryBulkStatePtr state;
serialization.serializeBinaryBulkStatePrefix(settings, state);
serialization.serializeBinaryBulkStatePrefix(*full_column, settings, state);
serialization.serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state);
serialization.serializeBinaryBulkStateSuffix(settings, state);
}

View File

@ -3360,9 +3360,8 @@ private:
{
return [] (ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable * nullable_source, size_t input_rows_count)
{
auto res = ConvertImplGenericFromString<ColumnString>::execute(arguments, result_type, nullable_source, input_rows_count);
auto & res_object = assert_cast<ColumnObject &>(res->assumeMutableRef());
res_object.finalize();
auto res = ConvertImplGenericFromString<ColumnString>::execute(arguments, result_type, nullable_source, input_rows_count)->assumeMutable();
res->finalize();
return res;
};
}

View File

@ -54,7 +54,7 @@ public:
auto serialization = elem.type->getDefaultSerialization();
serialization->serializeBinaryBulkStatePrefix(settings, state);
serialization->serializeBinaryBulkStatePrefix(*full_column, settings, state);
serialization->serializeBinaryBulkWithMultipleStreams(*full_column,
0 /** offset */, 0 /** limit */,
settings, state);

View File

@ -829,7 +829,7 @@ void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & creat
{
for (const auto & [name, type] : properties.columns.getAllPhysical())
{
if (isObject(type))
if (type->hasDynamicSubcolumns())
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"Cannot create table with column '{}' which type is '{}' "
@ -1398,7 +1398,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
/// we can safely destroy the object without a call to "shutdown", because there is guarantee
/// that no background threads/similar resources remain after exception from "startup".
if (!res->supportsDynamicSubcolumns() && hasObjectColumns(res->getInMemoryMetadataPtr()->getColumns()))
if (!res->supportsDynamicSubcolumns() && hasDynamicSubcolumns(res->getInMemoryMetadataPtr()->getColumns()))
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"Cannot create table with column of type Object, "

View File

@ -387,6 +387,9 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
}
else if (isObject(type))
{
if (src.getType() == Field::Types::Object)
return src; /// Already in needed type.
const auto * from_type_tuple = typeid_cast<const DataTypeTuple *>(from_type_hint);
if (src.getType() == Field::Types::Tuple && from_type_tuple && from_type_tuple->haveExplicitNames())
{

View File

@ -232,7 +232,9 @@ Chunk IRowInputFormat::generate()
return {};
}
finalizeObjectColumns(columns);
for (const auto & column : columns)
column->finalize();
Chunk chunk(std::move(columns), num_rows);
return chunk;
}

View File

@ -101,7 +101,9 @@ Chunk ValuesBlockInputFormat::generate()
return {};
}
finalizeObjectColumns(columns);
for (const auto & column : columns)
column->finalize();
size_t rows_in_block = columns[0]->size();
return Chunk{std::move(columns), rows_in_block};
}

View File

@ -180,6 +180,13 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
std::sregex_token_iterator(remote_fs_metadata.begin(), remote_fs_metadata.end(), re, -1),
std::sregex_token_iterator());
bool send_projections = client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION;
if (send_projections)
{
const auto & projections = part->getProjectionParts();
writeBinary(projections.size(), out);
}
if (data_settings->allow_remote_fs_zero_copy_replication &&
/// In memory data part does not have metadata yet.
!isInMemoryPart(part) &&
@ -190,33 +197,15 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
{
/// Send metadata if the receiver's capability covers the source disk type.
response.addCookie({"remote_fs_metadata", disk_type});
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
{
const auto & projections = part->getProjectionParts();
writeBinary(projections.size(), out);
}
sendPartFromDiskRemoteMeta(part, out, true, part->getProjectionParts());
sendPartFromDiskRemoteMeta(part, out, true, send_projections);
return;
}
}
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
{
const auto & projections = part->getProjectionParts();
writeBinary(projections.size(), out);
if (isInMemoryPart(part))
sendPartFromMemory(part, out, projections);
sendPartFromMemory(part, out, send_projections);
else
sendPartFromDisk(part, out, client_protocol_version, projections);
}
else
{
if (isInMemoryPart(part))
sendPartFromMemory(part, out);
else
sendPartFromDisk(part, out, client_protocol_version);
}
sendPartFromDisk(part, out, client_protocol_version, send_projections);
}
catch (const NetException &)
{
@ -238,10 +227,12 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
}
void Service::sendPartFromMemory(
const MergeTreeData::DataPartPtr & part, WriteBuffer & out, const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
const MergeTreeData::DataPartPtr & part, WriteBuffer & out, bool send_projections)
{
auto metadata_snapshot = data.getInMemoryMetadataPtr();
for (const auto & [name, projection] : projections)
if (send_projections)
{
for (const auto & [name, projection] : part->getProjectionParts())
{
auto projection_sample_block = metadata_snapshot->projections.get(name).sample_block;
auto part_in_memory = asInMemoryPart(projection);
@ -253,6 +244,7 @@ void Service::sendPartFromMemory(
NativeWriter block_out(out, 0, projection_sample_block);
block_out.write(part_in_memory->block);
}
}
auto part_in_memory = asInMemoryPart(part);
if (!part_in_memory)
@ -269,7 +261,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
const MergeTreeData::DataPartPtr & part,
WriteBuffer & out,
int client_protocol_version,
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
bool send_projections)
{
/// We'll take a list of files from the list of checksums.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
@ -277,7 +269,8 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
auto file_names_without_checksums = part->getFileNamesWithoutChecksums();
for (const auto & file_name : file_names_without_checksums)
{
if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION && file_name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION
&& file_name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
continue;
checksums.files[file_name] = {};
@ -288,11 +281,10 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
{
// Get rid of projection files
checksums.files.erase(name + ".proj");
auto it = projections.find(name);
if (it != projections.end())
if (send_projections)
{
writeStringBinary(name, out);
MergeTreeData::DataPart::Checksums projection_checksum = sendPartFromDisk(it->second, out, client_protocol_version);
MergeTreeData::DataPart::Checksums projection_checksum = sendPartFromDisk(projection, out, client_protocol_version, false);
data_checksums.addFile(name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());
}
else if (part->checksums.has(name + ".proj"))
@ -337,18 +329,15 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
return data_checksums;
}
MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
void Service::sendPartFromDiskRemoteMeta(
const MergeTreeData::DataPartPtr & part,
WriteBuffer & out,
bool send_part_id,
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
bool send_projections)
{
const auto * data_part_storage_on_disk = dynamic_cast<const DataPartStorageOnDisk *>(&part->getDataPartStorage());
if (!data_part_storage_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage '{}' doesn't support zero-copy replication", part->getDataPartStorage().getDiskName());
if (!data_part_storage_on_disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk '{}' doesn't support zero-copy replication", data_part_storage_on_disk->getDiskName());
auto data_part_storage = part->getDataPartStoragePtr();
if (!data_part_storage->supportZeroCopyReplication())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk '{}' doesn't support zero-copy replication", data_part_storage->getDiskName());
/// We'll take a list of files from the list of checksums.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
@ -369,30 +358,20 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
paths.push_back(fs::path(part->getDataPartStorage().getRelativePath()) / it.first);
/// Serialized metadatadatas with zero ref counts.
auto metadatas = data_part_storage_on_disk->getSerializedMetadata(paths);
auto metadatas = data_part_storage->getSerializedMetadata(paths);
if (send_part_id)
{
String part_id = data_part_storage_on_disk->getUniqueId();
String part_id = data_part_storage->getUniqueId();
writeStringBinary(part_id, out);
}
MergeTreeData::DataPart::Checksums data_checksums;
if (send_projections)
{
for (const auto & [name, projection] : part->getProjectionParts())
{
auto it = projections.find(name);
if (it != projections.end())
{
writeStringBinary(name, out);
MergeTreeData::DataPart::Checksums projection_checksum = sendPartFromDiskRemoteMeta(it->second, out, false);
data_checksums.addFile(name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());
}
else if (part->checksums.has(name + ".proj"))
{
// We don't send this projection, just add out checksum to bypass the following check
const auto & our_checksum = part->checksums.files.find(name + ".proj")->second;
data_checksums.addFile(name + ".proj", our_checksum.file_size, our_checksum.file_hash);
sendPartFromDiskRemoteMeta(projection, out, false, false);
}
}
@ -403,7 +382,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
String file_path_prefix = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
/// Just some additional checks
String metadata_file_path = fs::path(data_part_storage_on_disk->getDiskPath()) / file_path_prefix;
String metadata_file_path = fs::path(data_part_storage->getDiskPath()) / file_path_prefix;
fs::path metadata(metadata_file_path);
if (!fs::exists(metadata))
throw Exception(ErrorCodes::CORRUPTED_DATA, "Remote metadata '{}' is not exists", file_name);
@ -427,12 +406,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
throw Exception(ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, "Unexpected size of file {}", metadata_file_path);
writePODBinary(hashing_out.getHash(), out);
if (!file_names_without_checksums.contains(file_name))
data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
}
return data_checksums;
}
MergeTreeData::DataPartPtr Service::findPart(const String & name)
@ -707,20 +681,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
in->setNextCallback(ReplicatedFetchReadCallback(*entry));
return part_type == "InMemory"
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, *in, projections, throttler)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix, sync, disk, *in, projections, checksums, throttler);
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
const String & part_name,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
ThrottlerPtr throttler)
if (part_type == "InMemory")
{
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
@ -729,46 +690,45 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
data.getRelativeDataPath(),
part_name);
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, data_part_storage);
return downloadPartToMemory(
data_part_storage, part_name,
MergeTreePartInfo::fromPartName(part_name, data.format_version),
part_uuid, metadata_snapshot, context, *in,
projections, false, throttler);
}
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
return downloadPartToDisk(
part_name, replica_path, to_detached, tmp_prefix,
sync, disk, *in, projections, checksums, throttler);
}
for (auto i = 0ul; i < projections; ++i)
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
MutableDataPartStoragePtr data_part_storage,
const String & part_name,
const MergeTreePartInfo & part_info,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
bool is_projection,
ThrottlerPtr throttler)
{
auto new_data_part = std::make_shared<MergeTreeDataPartInMemory>(data, part_name, part_info, data_part_storage);
for (size_t i = 0; i < projections; ++i)
{
String projection_name;
readStringBinary(projection_name, in);
MergeTreeData::DataPart::Checksums checksums;
if (!checksums.read(in))
throw Exception("Cannot deserialize checksums", ErrorCodes::CORRUPTED_DATA);
NativeReader block_in(in, 0);
auto block = block_in.read();
throttler->add(block.bytes());
auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj");
MergeTreePartInfo new_part_info("all", 0, 0, 0);
MergeTreeData::MutableDataPartPtr new_projection_part =
std::make_shared<MergeTreeDataPartInMemory>(data, projection_name, new_part_info, projection_part_storage, new_data_part.get());
auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj");
new_projection_part->is_temp = false;
new_projection_part->setColumns(block.getNamesAndTypesList(), {});
MergeTreePartition partition{};
new_projection_part->partition = std::move(partition);
new_projection_part->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
auto new_projection_part = downloadPartToMemory(
projection_part_storage, projection_name,
new_part_info, part_uuid, metadata_snapshot,
context, in, 0, true, throttler);
MergedBlockOutputStream part_out(
new_projection_part,
metadata_snapshot->projections.get(projection_name).metadata,
block.getNamesAndTypesList(),
{},
CompressionCodecFactory::instance().get("NONE", {}),
NO_TRANSACTION_PTR);
part_out.write(block);
part_out.finalizePart(new_projection_part, false);
new_projection_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
new_data_part->addProjectionPart(projection_name, std::move(new_projection_part));
}
@ -780,11 +740,16 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
auto block = block_in.read();
throttler->add(block.bytes());
new_data_part->setColumns(block.getNamesAndTypesList(), {});
if (!is_projection)
{
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
new_data_part->uuid = part_uuid;
new_data_part->is_temp = true;
new_data_part->setColumns(block.getNamesAndTypesList(), {});
new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
new_data_part->partition.create(metadata_snapshot, block, 0, context);
}
MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {},
@ -850,7 +815,6 @@ void Fetcher::downloadBasePartOrProjectionPartToDiskRemoteMeta(
checksums.addFile(file_name, file_size, expected_hash);
}
}
}
@ -966,11 +930,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
SyncGuardPtr sync_guard;
if (data.getSettings()->fsync_part_directory)
sync_guard = disk->getDirectorySyncGuard(data_part_storage->getRelativePath());
sync_guard = data_part_storage->getDirectorySyncGuard();
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
for (auto i = 0ul; i < projections; ++i)
for (size_t i = 0; i < projections; ++i)
{
String projection_name;
readStringBinary(projection_name, in);
@ -1043,7 +1007,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
data_part_storage->createDirectories();
for (auto i = 0ul; i < projections; ++i)
for (size_t i = 0; i < projections; ++i)
{
String projection_name;
readStringBinary(projection_name, in);
@ -1071,7 +1035,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);
}
#if USE_AWS_S3

View File

@ -1,5 +1,6 @@
#pragma once
#include "Storages/MergeTree/MergeTreePartInfo.h"
#include <Interpreters/InterserverIOHandler.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/IStorage_fwd.h>
@ -42,19 +43,19 @@ private:
void sendPartFromMemory(
const MergeTreeData::DataPartPtr & part,
WriteBuffer & out,
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections = {});
bool send_projections);
MergeTreeData::DataPart::Checksums sendPartFromDisk(
const MergeTreeData::DataPartPtr & part,
WriteBuffer & out,
int client_protocol_version,
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections = {});
bool send_projections);
MergeTreeData::DataPart::Checksums sendPartFromDiskRemoteMeta(
void sendPartFromDiskRemoteMeta(
const MergeTreeData::DataPartPtr & part,
WriteBuffer & out,
bool send_part_id,
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections = {});
bool send_projections);
/// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish,
/// so Service will never access dangling reference to storage
@ -120,13 +121,15 @@ private:
ThrottlerPtr throttler);
MergeTreeData::MutableDataPartPtr downloadPartToMemory(
MutableDataPartStoragePtr data_part_storage,
const String & part_name,
const MergeTreePartInfo & part_info,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
bool is_projection,
ThrottlerPtr throttler);
MergeTreeData::MutableDataPartPtr downloadPartToDiskRemoteMeta(

View File

@ -153,7 +153,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical();
global_ctx->storage_columns = global_ctx->metadata_snapshot->getColumns().getAllPhysical();
auto object_columns = MergeTreeData::getObjectColumns(global_ctx->future_part->parts, global_ctx->metadata_snapshot->getColumns());
auto object_columns = MergeTreeData::getConcreteObjectColumns(global_ctx->future_part->parts, global_ctx->metadata_snapshot->getColumns());
global_ctx->storage_snapshot = std::make_shared<StorageSnapshot>(*global_ctx->data, global_ctx->metadata_snapshot, object_columns);
extendObjectColumns(global_ctx->storage_columns, object_columns, false);

View File

@ -7124,18 +7124,18 @@ ReservationPtr MergeTreeData::balancedReservation(
return reserved_space;
}
ColumnsDescription MergeTreeData::getObjectColumns(
ColumnsDescription MergeTreeData::getConcreteObjectColumns(
const DataPartsVector & parts, const ColumnsDescription & storage_columns)
{
return DB::getObjectColumns(
return DB::getConcreteObjectColumns(
parts.begin(), parts.end(),
storage_columns, [](const auto & part) -> const auto & { return part->getColumns(); });
}
ColumnsDescription MergeTreeData::getObjectColumns(
ColumnsDescription MergeTreeData::getConcreteObjectColumns(
boost::iterator_range<DataPartIteratorByStateAndInfo> range, const ColumnsDescription & storage_columns)
{
return DB::getObjectColumns(
return DB::getConcreteObjectColumns(
range.begin(), range.end(),
storage_columns, [](const auto & part) -> const auto & { return part->getColumns(); });
}
@ -7144,21 +7144,21 @@ void MergeTreeData::resetObjectColumnsFromActiveParts(const DataPartsLock & /*lo
{
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & columns = metadata_snapshot->getColumns();
if (!hasObjectColumns(columns))
if (!hasDynamicSubcolumns(columns))
return;
auto range = getDataPartsStateRange(DataPartState::Active);
object_columns = getObjectColumns(range, columns);
object_columns = getConcreteObjectColumns(range, columns);
}
void MergeTreeData::updateObjectColumns(const DataPartPtr & part, const DataPartsLock & /*lock*/)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & columns = metadata_snapshot->getColumns();
if (!hasObjectColumns(columns))
if (!hasDynamicSubcolumns(columns))
return;
DB::updateObjectColumns(object_columns, part->getColumns());
DB::updateObjectColumns(object_columns, columns, part->getColumns());
}
StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const

View File

@ -779,10 +779,10 @@ public:
return column_sizes;
}
const ColumnsDescription & getObjectColumns() const { return object_columns; }
const ColumnsDescription & getConcreteObjectColumns() const { return object_columns; }
/// Creates description of columns of data type Object from the range of data parts.
static ColumnsDescription getObjectColumns(
static ColumnsDescription getConcreteObjectColumns(
const DataPartsVector & parts, const ColumnsDescription & storage_columns);
IndexSizeByName getSecondaryIndexSizes() const override
@ -1151,7 +1151,7 @@ protected:
}
/// Creates description of columns of data type Object from the range of data parts.
static ColumnsDescription getObjectColumns(
static ColumnsDescription getConcreteObjectColumns(
boost::iterator_range<DataPartIteratorByStateAndInfo> range, const ColumnsDescription & storage_columns);
std::optional<UInt64> totalRowsByPartitionPredicateImpl(

View File

@ -131,7 +131,7 @@ void writeColumnSingleGranule(
serialize_settings.position_independent_encoding = true; //-V1048
serialize_settings.low_cardinality_max_dictionary_size = 0; //-V1048
serialization->serializeBinaryBulkStatePrefix(serialize_settings, state);
serialization->serializeBinaryBulkStatePrefix(*column.column, serialize_settings, state);
serialization->serializeBinaryBulkWithMultipleStreams(*column.column, from_row, number_of_rows, serialize_settings, state);
serialization->serializeBinaryBulkStateSuffix(serialize_settings, state);
}

View File

@ -355,7 +355,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
{
ISerialization::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name_and_type, offset_columns);
serialization->serializeBinaryBulkStatePrefix(serialize_settings, it->second);
serialization->serializeBinaryBulkStatePrefix(column, serialize_settings, it->second);
}
const auto & global_settings = storage.getContext()->getSettingsRef();

View File

@ -288,7 +288,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
auto columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
for (auto & column : columns)
if (isObject(column.type))
if (column.type->hasDynamicSubcolumns())
column.type = block.getByName(column.name).type;
static const String TMP_PREFIX = "tmp_insert_";
@ -480,8 +480,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
const String & part_name,
MergeTreeDataPartType part_type,
const String & relative_path,
bool is_temp,
IMergeTreeDataPart * parent_part,
const MergeTreeData & data,
@ -493,6 +491,21 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
const StorageMetadataPtr & metadata_snapshot = projection.metadata;
MergeTreePartInfo new_part_info("all", 0, 0, 0);
MergeTreeDataPartType part_type;
if (parent_part->getType() == MergeTreeDataPartType::InMemory)
{
part_type = MergeTreeDataPartType::InMemory;
}
else
{
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
// just check if there is enough space on parent volume
data.reserveSpace(expected_size, parent_part->getDataPartStorage());
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
}
auto relative_path = part_name + (is_temp ? ".tmp_proj" : ".proj");
auto projection_part_storage = parent_part->getDataPartStorage().getProjection(relative_path);
auto new_data_part = data.createPart(
part_name,
@ -583,77 +596,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPart(
MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
IMergeTreeDataPart * parent_part)
{
String part_name = projection.name;
MergeTreeDataPartType part_type;
if (parent_part->getType() == MergeTreeDataPartType::InMemory)
{
part_type = MergeTreeDataPartType::InMemory;
}
else
{
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
// just check if there is enough space on parent volume
data.reserveSpace(expected_size, parent_part->getDataPartStorage());
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
}
return writeProjectionPartImpl(
part_name,
part_type,
part_name + ".proj" /* relative_path */,
false /* is_temp */,
parent_part,
data,
log,
block,
projection);
}
/// This is used for projection materialization process which may contain multiple stages of
/// projection part merges.
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempProjectionPart(
MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
IMergeTreeDataPart * parent_part,
size_t block_num)
{
String part_name = fmt::format("{}_{}", projection.name, block_num);
MergeTreeDataPartType part_type;
if (parent_part->getType() == MergeTreeDataPartType::InMemory)
{
part_type = MergeTreeDataPartType::InMemory;
}
else
{
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
// just check if there is enough space on parent volume
data.reserveSpace(expected_size, parent_part->getDataPartStorage());
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
}
return writeProjectionPartImpl(
part_name,
part_type,
part_name + ".tmp_proj" /* relative_path */,
true /* is_temp */,
parent_part,
data,
log,
block,
projection);
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeInMemoryProjectionPart(
const MergeTreeData & data,
Poco::Logger * log,
Block block,
@ -662,13 +604,32 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeInMemoryProjectionP
{
return writeProjectionPartImpl(
projection.name,
MergeTreeDataPartType::InMemory,
projection.name + ".proj" /* relative_path */,
false /* is_temp */,
parent_part,
data,
log,
block,
std::move(block),
projection);
}
/// This is used for projection materialization process which may contain multiple stages of
/// projection part merges.
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempProjectionPart(
const MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
IMergeTreeDataPart * parent_part,
size_t block_num)
{
String part_name = fmt::format("{}_{}", projection.name, block_num);
return writeProjectionPartImpl(
part_name,
true /* is_temp */,
parent_part,
data,
log,
std::move(block),
projection);
}

View File

@ -73,7 +73,7 @@ public:
/// For insertion.
static TemporaryPart writeProjectionPart(
MergeTreeData & data,
const MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
@ -81,21 +81,13 @@ public:
/// For mutation: MATERIALIZE PROJECTION.
static TemporaryPart writeTempProjectionPart(
MergeTreeData & data,
const MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
IMergeTreeDataPart * parent_part,
size_t block_num);
/// For WriteAheadLog AddPart.
static TemporaryPart writeInMemoryProjectionPart(
const MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
IMergeTreeDataPart * parent_part);
static Block mergeBlock(
const Block & block,
SortDescription sort_description,
@ -106,8 +98,6 @@ public:
private:
static TemporaryPart writeProjectionPartImpl(
const String & part_name,
MergeTreeDataPartType part_type,
const String & relative_path,
bool is_temp,
IMergeTreeDataPart * parent_part,
const MergeTreeData & data,
@ -116,7 +106,6 @@ private:
const ProjectionDescription & projection);
MergeTreeData & data;
Poco::Logger * log;
};

View File

@ -74,8 +74,9 @@ void MergeTreeIndexGranuleSet::serializeBinary(WriteBuffer & ostr) const
auto serialization = type->getDefaultSerialization();
ISerialization::SerializeBinaryBulkStatePtr state;
serialization->serializeBinaryBulkStatePrefix(settings, state);
serialization->serializeBinaryBulkWithMultipleStreams(*block.getByPosition(i).column, 0, size(), settings, state);
const auto & column = *block.getByPosition(i).column;
serialization->serializeBinaryBulkStatePrefix(column, settings, state);
serialization->serializeBinaryBulkWithMultipleStreams(column, 0, size(), settings, state);
serialization->serializeBinaryBulkStateSuffix(settings, state);
}
}

View File

@ -1,8 +1,8 @@
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/StorageMergeTree.h>
#include <DataTypes/ObjectUtils.h>
#include <Interpreters/PartLog.h>
#include <DataTypes/ObjectUtils.h>
namespace ProfileEvents
{
@ -56,8 +56,9 @@ struct MergeTreeSink::DelayedChunk
void MergeTreeSink::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
if (!storage_snapshot->object_columns.empty())
convertDynamicColumnsToTuples(block, storage_snapshot);
deduceTypesOfObjectColumns(storage_snapshot, block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
using DelayedPartitions = std::vector<MergeTreeSink::DelayedChunk::Partition>;

View File

@ -237,7 +237,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
for (const auto & projection : metadata_snapshot->getProjections())
{
auto projection_block = projection.calculate(block, context);
auto temp_part = MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get());
auto temp_part = MergeTreeDataWriter::writeProjectionPart(storage, log, projection_block, projection, part.get());
temp_part.finalize();
if (projection_block.rows())
part->addProjectionPart(projection.name, std::move(temp_part.part));

View File

@ -1,10 +1,10 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <DataTypes/ObjectUtils.h>
#include <Interpreters/PartLog.h>
#include <Common/SipHash.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <DataTypes/ObjectUtils.h>
#include <Core/Block.h>
#include <IO/Operators.h>
@ -165,7 +165,9 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
*/
size_t replicas_num = checkQuorumPrecondition(zookeeper);
deduceTypesOfObjectColumns(storage_snapshot, block);
if (!storage_snapshot->object_columns.empty())
convertDynamicColumnsToTuples(block, storage_snapshot);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
using DelayedPartitions = std::vector<ReplicatedMergeTreeSink::DelayedChunk::Partition>;

View File

@ -47,10 +47,10 @@ public:
const StorageMetadataPtr & metadata_snapshot, ContextPtr /*query_context*/) const override
{
const auto & storage_columns = metadata_snapshot->getColumns();
if (!hasObjectColumns(storage_columns))
if (!hasDynamicSubcolumns(storage_columns))
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot);
auto object_columns = getObjectColumns(
auto object_columns = getConcreteObjectColumns(
parts.begin(), parts.end(),
storage_columns, [](const auto & part) -> const auto & { return part->getColumns(); });

View File

@ -112,98 +112,17 @@ IMergeTreeDataPart::Checksums checkDataPart(
};
/// This function calculates only checksum of file content (compressed or uncompressed).
/// It also calculates checksum of projections.
auto checksum_file = [&](const String & file_name)
{
if (data_part_storage.isDirectory(file_name) && endsWith(file_name, ".proj"))
{
auto projection_name = file_name.substr(0, file_name.size() - sizeof(".proj") + 1);
auto pit = data_part->getProjectionParts().find(projection_name);
if (pit == data_part->getProjectionParts().end())
{
if (require_checksums)
throw Exception("Unexpected file " + file_name + " in data part", ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART);
else
return;
}
const auto & projection = pit->second;
IMergeTreeDataPart::Checksums projection_checksums_data;
auto projection_part_storage = data_part_storage.getProjection(file_name);
if (projection->getType() == MergeTreeDataPartType::Compact)
{
auto file_buf = projection_part_storage->readFile(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION, {}, std::nullopt, std::nullopt);
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.ignoreAll();
projection_checksums_data.files[MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION]
= IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
}
else
{
const NamesAndTypesList & projection_columns_list = projection->getColumns();
for (const auto & projection_column : projection_columns_list)
{
get_serialization(projection_column)->enumerateStreams(
[&](const ISerialization::SubstreamPath & substream_path)
{
String projection_file_name = ISerialization::getFileNameForStream(projection_column, substream_path) + ".bin";
projection_checksums_data.files[projection_file_name] = checksum_compressed_file(*projection_part_storage, projection_file_name);
});
}
}
IMergeTreeDataPart::Checksums projection_checksums_txt;
if (require_checksums || projection_part_storage->exists("checksums.txt"))
{
auto buf = projection_part_storage->readFile("checksums.txt", {}, std::nullopt, std::nullopt);
projection_checksums_txt.read(*buf);
assertEOF(*buf);
}
const auto & projection_checksum_files_txt = projection_checksums_txt.files;
for (auto projection_it = projection_part_storage->iterate(); projection_it->isValid(); projection_it->next())
{
const String & projection_file_name = projection_it->name();
auto projection_checksum_it = projection_checksums_data.files.find(projection_file_name);
/// Skip files that we already calculated. Also skip metadata files that are not checksummed.
if (projection_checksum_it == projection_checksums_data.files.end() && !files_without_checksums.contains(projection_file_name))
{
auto projection_txt_checksum_it = projection_checksum_files_txt.find(file_name);
if (projection_txt_checksum_it == projection_checksum_files_txt.end()
|| projection_txt_checksum_it->second.uncompressed_size == 0)
{
auto projection_file_buf = projection_part_storage->readFile(projection_file_name, {}, std::nullopt, std::nullopt);
HashingReadBuffer projection_hashing_buf(*projection_file_buf);
projection_hashing_buf.ignoreAll();
projection_checksums_data.files[projection_file_name] = IMergeTreeDataPart::Checksums::Checksum(
projection_hashing_buf.count(), projection_hashing_buf.getHash());
}
else
{
projection_checksums_data.files[projection_file_name] = checksum_compressed_file(*projection_part_storage, projection_file_name);
}
}
}
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(
projection_checksums_data.getTotalSizeOnDisk(), projection_checksums_data.getTotalChecksumUInt128());
if (require_checksums || !projection_checksums_txt.files.empty())
projection_checksums_txt.checkEqual(projection_checksums_data, false);
}
else
{
auto file_buf = data_part_storage.readFile(file_name, {}, std::nullopt, std::nullopt);
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.ignoreAll();
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
}
};
bool check_uncompressed = true;
/// Do not check uncompressed for projections. But why?
bool check_uncompressed = !data_part->isProjectionPart();
/// First calculate checksums for columns data
if (part_type == MergeTreeDataPartType::Compact)
{
@ -238,10 +157,19 @@ IMergeTreeDataPart::Checksums checkDataPart(
assertEOF(*buf);
}
NameSet projections_on_disk;
const auto & checksum_files_txt = checksums_txt.files;
for (auto it = data_part_storage.iterate(); it->isValid(); it->next())
{
const String & file_name = it->name();
auto file_name = it->name();
/// We will check projections later.
if (data_part_storage.isDirectory(file_name) && endsWith(file_name, ".proj"))
{
projections_on_disk.insert(file_name);
continue;
}
auto checksum_it = checksums_data.files.find(file_name);
/// Skip files that we already calculated. Also skip metadata files that are not checksummed.
@ -260,11 +188,38 @@ IMergeTreeDataPart::Checksums checkDataPart(
}
}
for (const auto & [name, projection] : data_part->getProjectionParts())
{
if (is_cancelled())
return {};
auto projection_file = name + ".proj";
auto projection_checksums = checkDataPart(
projection, *data_part_storage.getProjection(projection_file),
projection->getColumns(), projection->getType(),
projection->getFileNamesWithoutChecksums(),
require_checksums, is_cancelled);
checksums_data.files[projection_file] = IMergeTreeDataPart::Checksums::Checksum(
projection_checksums.getTotalSizeOnDisk(),
projection_checksums.getTotalChecksumUInt128());
projections_on_disk.erase(projection_file);
}
if (require_checksums && !projections_on_disk.empty())
{
throw Exception(ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART,
"Found unexpected projection directories: {}",
fmt::join(projections_on_disk, ","));
}
if (is_cancelled())
return {};
if (require_checksums || !checksums_txt.files.empty())
checksums_txt.checkEqual(checksums_data, check_uncompressed);
return checksums_data;
}

View File

@ -598,7 +598,7 @@ std::optional<QueryProcessingStage::Enum> StorageDistributed::getOptimizedQueryP
static bool requiresObjectColumns(const ColumnsDescription & all_columns, ASTPtr query)
{
if (!hasObjectColumns(all_columns))
if (!hasDynamicSubcolumns(all_columns))
return false;
if (!query)
@ -613,7 +613,7 @@ static bool requiresObjectColumns(const ColumnsDescription & all_columns, ASTPtr
auto name_in_storage = Nested::splitName(required_column).first;
auto column_in_storage = all_columns.tryGetPhysical(name_in_storage);
if (column_in_storage && isObject(column_in_storage->type))
if (column_in_storage && column_in_storage->type->hasDynamicSubcolumns())
return true;
}
@ -640,7 +640,7 @@ StorageSnapshotPtr StorageDistributed::getStorageSnapshotForQuery(
metadata_snapshot->getColumns(),
getContext());
auto object_columns = DB::getObjectColumns(
auto object_columns = DB::getConcreteObjectColumns(
snapshot_data->objects_by_shard.begin(),
snapshot_data->objects_by_shard.end(),
metadata_snapshot->getColumns(),

View File

@ -526,7 +526,7 @@ void StorageInMemoryMetadata::check(const NamesAndTypesList & provided_columns)
const auto * available_type = it->getMapped();
if (!isObject(*available_type)
if (!available_type->hasDynamicSubcolumns()
&& !column.type->equals(*available_type)
&& !isCompatibleEnumTypes(available_type, column.type.get()))
throw Exception(
@ -575,7 +575,7 @@ void StorageInMemoryMetadata::check(const NamesAndTypesList & provided_columns,
const auto * provided_column_type = it->getMapped();
const auto * available_column_type = jt->getMapped();
if (!isObject(*provided_column_type)
if (!provided_column_type->hasDynamicSubcolumns()
&& !provided_column_type->equals(*available_column_type)
&& !isCompatibleEnumTypes(available_column_type, provided_column_type))
throw Exception(
@ -619,7 +619,7 @@ void StorageInMemoryMetadata::check(const Block & block, bool need_all) const
listOfColumns(available_columns));
const auto * available_type = it->getMapped();
if (!isObject(*available_type)
if (!available_type->hasDynamicSubcolumns()
&& !column.type->equals(*available_type)
&& !isCompatibleEnumTypes(available_type, column.type.get()))
throw Exception(

View File

@ -462,7 +462,7 @@ void LogSink::writeData(const NameAndTypePair & name_and_type, const IColumn & c
settings.getter = createStreamGetter(name_and_type);
if (!serialize_states.contains(name))
serialization->serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
serialization->serializeBinaryBulkStatePrefix(column, settings, serialize_states[name]);
if (storage.use_marks_file)
{

View File

@ -146,7 +146,7 @@ public:
auto extended_storage_columns = storage_snapshot->getColumns(
GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects());
convertObjectsToTuples(block, extended_storage_columns);
convertDynamicColumnsToTuples(block, storage_snapshot);
}
if (storage.compress)
@ -212,10 +212,10 @@ StorageSnapshotPtr StorageMemory::getStorageSnapshot(const StorageMetadataPtr &
auto snapshot_data = std::make_unique<SnapshotData>();
snapshot_data->blocks = data.get();
if (!hasObjectColumns(metadata_snapshot->getColumns()))
if (!hasDynamicSubcolumns(metadata_snapshot->getColumns()))
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, ColumnsDescription{}, std::move(snapshot_data));
auto object_columns = getObjectColumns(
auto object_columns = getConcreteObjectColumns(
snapshot_data->blocks->begin(),
snapshot_data->blocks->end(),
metadata_snapshot->getColumns(),

View File

@ -76,7 +76,7 @@ std::optional<NameAndTypePair> StorageSnapshot::tryGetColumn(const GetColumnsOpt
{
const auto & columns = getMetadataForQuery()->getColumns();
auto column = columns.tryGetColumn(options, column_name);
if (column && (!isObject(column->type) || !options.with_extended_objects))
if (column && (!column->type->hasDynamicSubcolumns() || !options.with_extended_objects))
return column;
if (options.with_extended_objects)

View File

@ -200,7 +200,7 @@ ColumnsDescriptionByShardNum getExtendedObjectsOfRemoteTables(
auto type_name = type_col[i].get<const String &>();
auto storage_column = storage_columns.tryGetPhysical(name);
if (storage_column && isObject(storage_column->type))
if (storage_column && storage_column->type->hasDynamicSubcolumns())
res.add(ColumnDescription(std::move(name), DataTypeFactory::instance().get(type_name)));
}
}

View File

@ -0,0 +1,34 @@
<?xml version="1.0"?>
<clickhouse>
<storage_configuration>
<disks>
<backup_disk_s3_plain>
<type>s3_plain</type>
<endpoint>http://minio1:9001/root/data/disks/disk_s3_plain/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</backup_disk_s3_plain>
<attach_disk_s3_plain>
<type>s3_plain</type>
<!-- NOTE: /backup/ is a name of BACKUP -->
<endpoint>http://minio1:9001/root/data/disks/disk_s3_plain/backup/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</attach_disk_s3_plain>
</disks>
<policies>
<attach_policy_s3_plain>
<volumes>
<main>
<disk>attach_disk_s3_plain</disk>
</main>
</volumes>
</attach_policy_s3_plain>
</policies>
</storage_configuration>
<backups>
<allowed_disk>backup_disk_s3_plain</allowed_disk>
</backups>
</clickhouse>

View File

@ -0,0 +1,40 @@
# pylint: disable=global-statement
# pylint: disable=line-too-long
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/disk_s3.xml"],
with_minio=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield
finally:
cluster.shutdown()
def test_attach_backup():
node.query(
f"""
-- BACKUP writes Ordinary like structure
set allow_deprecated_database_ordinary=1;
create database ordinary engine=Ordinary;
create table ordinary.test_backup_attach engine=MergeTree() order by tuple() as select * from numbers(100);
-- NOTE: name of backup ("backup") is significant.
backup table ordinary.test_backup_attach TO Disk('backup_disk_s3_plain', 'backup');
drop table ordinary.test_backup_attach;
attach table ordinary.test_backup_attach (number UInt64) engine=MergeTree() order by tuple() settings storage_policy='attach_policy_s3_plain';
"""
)
assert int(node.query("select count() from ordinary.test_backup_attach")) == 100

View File

@ -0,0 +1,23 @@
{"id":1,"arr":[{"k1":1,"k2":{"k3":2,"k4":3,"k5":""}},{"k1":2,"k2":{"k3":0,"k4":0,"k5":"foo"}}]}
{"id":2,"arr":[{"k1":3,"k2":{"k3":4,"k4":5,"k5":""}}]}
1 [1,2] [2,0] [3,0] ['','foo']
2 [3] [4] [5] ['']
{"arr":{"k1":1,"k2":{"k3":2,"k4":3,"k5":""}}}
{"arr":{"k1":2,"k2":{"k3":0,"k4":0,"k5":"foo"}}}
{"arr":{"k1":3,"k2":{"k3":4,"k4":5,"k5":""}}}
Array(Tuple(k1 Int8, k2 Tuple(k3 Int8, k4 Int8, k5 String)))
{"id":1,"arr":[{"k1":[{"k2":"aaa","k3":"bbb","k4":0},{"k2":"ccc","k3":"","k4":0}],"k5":{"k6":""}}]}
{"id":2,"arr":[{"k1":[{"k2":"","k3":"ddd","k4":10},{"k2":"","k3":"","k4":20}],"k5":{"k6":"foo"}}]}
1 [['aaa','ccc']] [['bbb','']] [[0,0]] ['']
2 [['','']] [['ddd','']] [[10,20]] ['foo']
{"k1":{"k2":"","k3":"","k4":20}}
{"k1":{"k2":"","k3":"ddd","k4":10}}
{"k1":{"k2":"aaa","k3":"bbb","k4":0}}
{"k1":{"k2":"ccc","k3":"","k4":0}}
Tuple(k2 String, k3 String, k4 Int8)
{"arr":[{"x":1}]}
{"arr":{"x":{"y":1},"t":{"y":2}}}
{"arr":[1,{"y":1}]}
{"arr":[2,{"y":2}]}
{"arr":[{"x":"aaa","y":[1,2,3]}]}
{"arr":[{"x":1}]}

View File

@ -0,0 +1,35 @@
-- Tags: no-fasttest
SET allow_experimental_object_type = 1;
DROP TABLE IF EXISTS t_json_array;
CREATE TABLE t_json_array (id UInt32, arr Array(JSON)) ENGINE = MergeTree ORDER BY id;
INSERT INTO t_json_array FORMAT JSONEachRow {"id": 1, "arr": [{"k1": 1, "k2": {"k3": 2, "k4": 3}}, {"k1": 2, "k2": {"k5": "foo"}}]}
INSERT INTO t_json_array FORMAT JSONEachRow {"id": 2, "arr": [{"k1": 3, "k2": {"k3": 4, "k4": 5}}]}
SET output_format_json_named_tuples_as_objects = 1;
SELECT * FROM t_json_array ORDER BY id FORMAT JSONEachRow;
SELECT id, arr.k1, arr.k2.k3, arr.k2.k4, arr.k2.k5 FROM t_json_array ORDER BY id;
SELECT arr FROM t_json_array ARRAY JOIN arr ORDER BY arr.k1 FORMAT JSONEachRow;
SELECT toTypeName(arr) FROM t_json_array LIMIT 1;
TRUNCATE TABLE t_json_array;
INSERT INTO t_json_array FORMAT JSONEachRow {"id": 1, "arr": [{"k1": [{"k2": "aaa", "k3": "bbb"}, {"k2": "ccc"}]}]}
INSERT INTO t_json_array FORMAT JSONEachRow {"id": 2, "arr": [{"k1": [{"k3": "ddd", "k4": 10}, {"k4": 20}], "k5": {"k6": "foo"}}]}
SELECT * FROM t_json_array ORDER BY id FORMAT JSONEachRow;
SELECT id, arr.k1.k2, arr.k1.k3, arr.k1.k4, arr.k5.k6 FROM t_json_array ORDER BY id;
SELECT arrayJoin(arrayJoin(arr.k1)) AS k1 FROM t_json_array ORDER BY k1 FORMAT JSONEachRow;
SELECT toTypeName(arrayJoin(arrayJoin(arr.k1))) AS arr FROM t_json_array LIMIT 1;
DROP TABLE t_json_array;
SELECT * FROM values('arr Array(JSON)', '[\'{"x" : 1}\']') FORMAT JSONEachRow;
SELECT * FROM values('arr Map(String, JSON)', '{\'x\' : \'{"y" : 1}\', \'t\' : \'{"y" : 2}\'}') FORMAT JSONEachRow;
SELECT * FROM values('arr Tuple(Int32, JSON)', '(1, \'{"y" : 1}\')', '(2, \'{"y" : 2}\')') FORMAT JSONEachRow;
SELECT * FROM format(JSONEachRow, '{"arr" : [{"x" : "aaa", "y" : [1,2,3]}]}') FORMAT JSONEachRow;
SELECT * FROM values('arr Array(JSON)', '[\'{"x" : 1}\']') FORMAT JSONEachRow;

View File

@ -0,0 +1,17 @@
Tuple(String, Map(String, Array(Tuple(k1 Nested(k2 Int8, k3 Int8, k5 String), k4 String))), Tuple(k1 String, k2 Tuple(k3 String, k4 String)))
=============
{"id":1,"data":["foo",{"aa":[{"k1":[{"k2":1,"k3":2,"k5":""},{"k2":0,"k3":3,"k5":""}],"k4":""},{"k1":[{"k2":4,"k3":0,"k5":""},{"k2":0,"k3":5,"k5":""},{"k2":6,"k3":0,"k5":""}],"k4":"qqq"}],"bb":[{"k1":[],"k4":"www"},{"k1":[{"k2":7,"k3":8,"k5":""},{"k2":9,"k3":10,"k5":""},{"k2":11,"k3":12,"k5":""}],"k4":""}]},{"k1":"aa","k2":{"k3":"bb","k4":"c"}}]}
{"id":2,"data":["bar",{"aa":[{"k1":[{"k2":13,"k3":14,"k5":""},{"k2":15,"k3":16,"k5":""}],"k4":"www"}]},{"k1":"","k2":{"k3":"","k4":""}}]}
{"id":3,"data":["some",{"aa":[{"k1":[{"k2":0,"k3":20,"k5":"some"}],"k4":""}]},{"k1":"eee","k2":{"k3":"","k4":""}}]}
=============
{"aa":[{"k1":[{"k2":1,"k3":2,"k5":""},{"k2":0,"k3":3,"k5":""}],"k4":""},{"k1":[{"k2":4,"k3":0,"k5":""},{"k2":0,"k3":5,"k5":""},{"k2":6,"k3":0,"k5":""}],"k4":"qqq"}],"bb":[{"k1":[],"k4":"www"},{"k1":[{"k2":7,"k3":8,"k5":""},{"k2":9,"k3":10,"k5":""},{"k2":11,"k3":12,"k5":""}],"k4":""}]}
{"aa":[{"k1":[{"k2":13,"k3":14,"k5":""},{"k2":15,"k3":16,"k5":""}],"k4":"www"}],"bb":[]}
{"aa":[{"k1":[{"k2":0,"k3":20,"k5":"some"}],"k4":""}],"bb":[]}
=============
{"k1":[[{"k2":1,"k3":2,"k5":""},{"k2":0,"k3":3,"k5":""}],[{"k2":4,"k3":0,"k5":""},{"k2":0,"k3":5,"k5":""},{"k2":6,"k3":0,"k5":""}]],"k4":["","qqq"]}
{"k1":[[{"k2":13,"k3":14,"k5":""},{"k2":15,"k3":16,"k5":""}]],"k4":["www"]}
{"k1":[[{"k2":0,"k3":20,"k5":"some"}]],"k4":[""]}
=============
{"obj":{"k1":"aa","k2":{"k3":"bb","k4":"c"}}}
{"obj":{"k1":"","k2":{"k3":"","k4":""}}}
{"obj":{"k1":"eee","k2":{"k3":"","k4":""}}}

Some files were not shown because too many files have changed in this diff Show More