mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 17:02:25 +00:00
Merge branch 'master' into pr-progress-bar
This commit is contained in:
commit
0bbf7ba921
2
contrib/avro
vendored
2
contrib/avro
vendored
@ -1 +1 @@
|
||||
Subproject commit 7832659ec986075d560f930c288e973c64679552
|
||||
Subproject commit 2fb8a8a6ec0eab9109b68abf3b4857e8c476b918
|
2
contrib/grpc
vendored
2
contrib/grpc
vendored
@ -1 +1 @@
|
||||
Subproject commit c52656e2bfcda3450bd6a7c247d2d9eeb8498524
|
||||
Subproject commit 80e8fd63fef4a8d35c1abe612854f238eadadf0a
|
@ -380,7 +380,7 @@ build.
|
||||
|
||||
### macOS-only: Install with Homebrew
|
||||
|
||||
To install ClickHouse using the popular `brew` package manager, follow the instructions listed in the [ClickHouse Homebrew tap](https://github.com/ClickHouse/homebrew-clickhouse).
|
||||
To install ClickHouse using [homebrew](https://brew.sh/), see [here](https://formulae.brew.sh/cask/clickhouse).
|
||||
|
||||
## Launch {#launch}
|
||||
|
||||
|
@ -1144,11 +1144,32 @@ SELECT arrayFold( x,acc -> acc + x*2, [1, 2, 3, 4], toInt64(3)) AS res;
|
||||
Result:
|
||||
|
||||
``` text
|
||||
┌─arrayFold(lambda(tuple(x, acc), plus(acc, multiply(x, 2))), [1, 2, 3, 4], toInt64(3))─┐
|
||||
│ 3 │
|
||||
└───────────────────────────────────────────────────────────────────────────────────────┘
|
||||
┌─res─┐
|
||||
│ 23 │
|
||||
└─────┘
|
||||
```
|
||||
|
||||
**Example with the Fibonacci sequence**
|
||||
|
||||
```sql
|
||||
SELECT arrayFold( x, acc -> (acc.2, acc.2 + acc.1), range(number), (1::Int64, 0::Int64)).1 AS fibonacci
|
||||
FROM numbers(1,10);
|
||||
|
||||
┌─fibonacci─┐
|
||||
│ 0 │
|
||||
│ 1 │
|
||||
│ 1 │
|
||||
│ 2 │
|
||||
│ 3 │
|
||||
│ 5 │
|
||||
│ 8 │
|
||||
│ 13 │
|
||||
│ 21 │
|
||||
│ 34 │
|
||||
└───────────┘
|
||||
```
|
||||
|
||||
|
||||
## arrayReverse(arr)
|
||||
|
||||
Returns an array of the same size as the original array containing the elements in reverse order.
|
||||
|
@ -62,7 +62,7 @@ Materialized views store data transformed by the corresponding [SELECT](../../..
|
||||
|
||||
When creating a materialized view without `TO [db].[table]`, you must specify `ENGINE` – the table engine for storing data.
|
||||
|
||||
When creating a materialized view with `TO [db].[table]`, you must not use `POPULATE`.
|
||||
When creating a materialized view with `TO [db].[table]`, you can't also use `POPULATE`.
|
||||
|
||||
A materialized view is implemented as follows: when inserting data to the table specified in `SELECT`, part of the inserted data is converted by this `SELECT` query, and the result is inserted in the view.
|
||||
|
||||
|
@ -104,6 +104,7 @@ if (TARGET ch_contrib::nats_io)
|
||||
endif()
|
||||
|
||||
add_headers_and_sources(dbms Storages/DataLakes)
|
||||
add_headers_and_sources(dbms Storages/DataLakes/Iceberg)
|
||||
add_headers_and_sources(dbms Common/NamedCollections)
|
||||
|
||||
if (TARGET ch_contrib::amqp_cpp)
|
||||
|
@ -128,7 +128,7 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
|
||||
}
|
||||
|
||||
std::lock_guard lock{database->metadata_mutex};
|
||||
if (!database->checkDigestValid(context))
|
||||
if (!database->checkDigestValid(context, false))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent database metadata after reconnection to ZooKeeper");
|
||||
}
|
||||
|
||||
|
@ -288,7 +288,7 @@ DictionaryHierarchyParentToChildIndexPtr FlatDictionary::getHierarchicalIndex()
|
||||
const auto & hierarchical_attribute = attributes[hierarchical_attribute_index];
|
||||
const ContainerType<UInt64> & parent_keys = std::get<ContainerType<UInt64>>(hierarchical_attribute.container);
|
||||
|
||||
HashMap<UInt64, PaddedPODArray<UInt64>> parent_to_child;
|
||||
DictionaryHierarchicalParentToChildIndex::ParentToChildIndex parent_to_child;
|
||||
parent_to_child.reserve(element_count);
|
||||
|
||||
UInt64 child_keys_size = static_cast<UInt64>(parent_keys.size());
|
||||
|
@ -328,7 +328,7 @@ DictionaryHierarchicalParentToChildIndexPtr HashedArrayDictionary<dictionary_key
|
||||
for (auto & [key, value] : key_attribute_container)
|
||||
index_to_key[value] = key;
|
||||
|
||||
HashMap<UInt64, PaddedPODArray<UInt64>> parent_to_child;
|
||||
DictionaryHierarchicalParentToChildIndex::ParentToChildIndex parent_to_child;
|
||||
parent_to_child.reserve(index_to_key.size());
|
||||
|
||||
size_t parent_keys_container_size = parent_keys_container.size();
|
||||
|
@ -4,13 +4,13 @@
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
||||
#include <Common/ArenaUtils.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/MemoryTrackerBlockerInThread.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/scope_guard_safe.h>
|
||||
#include <Common/setThreadName.h>
|
||||
|
||||
#include <Core/Defines.h>
|
||||
|
||||
@ -600,7 +600,7 @@ DictionaryHierarchyParentToChildIndexPtr HashedDictionary<dictionary_key_type, s
|
||||
for (const auto & map : child_key_to_parent_key_maps)
|
||||
size += map.size();
|
||||
|
||||
HashMap<UInt64, PaddedPODArray<UInt64>> parent_to_child;
|
||||
DictionaryHierarchicalParentToChildIndex::ParentToChildIndex parent_to_child;
|
||||
parent_to_child.reserve(size);
|
||||
|
||||
for (const auto & map : child_key_to_parent_key_maps)
|
||||
|
@ -26,7 +26,12 @@ public:
|
||||
UInt32 end_index;
|
||||
};
|
||||
|
||||
explicit DictionaryHierarchicalParentToChildIndex(const HashMap<UInt64, PaddedPODArray<UInt64>> & parent_to_children_map_)
|
||||
/// By default we use initial_bytes=4096 in PodArray.
|
||||
/// It might lead to really high memory consumption when arrays are almost empty but there are a lot of them.
|
||||
using Array = PODArray<UInt64, 8 * sizeof(UInt64), Allocator<false>, PADDING_FOR_SIMD - 1, PADDING_FOR_SIMD>;
|
||||
using ParentToChildIndex = HashMap<UInt64, Array>;
|
||||
|
||||
explicit DictionaryHierarchicalParentToChildIndex(const ParentToChildIndex & parent_to_children_map_)
|
||||
{
|
||||
size_t parent_to_children_map_size = parent_to_children_map_.size();
|
||||
|
||||
|
@ -173,7 +173,7 @@ TEST(HierarchyDictionariesUtils, getIsInHierarchy)
|
||||
TEST(HierarchyDictionariesUtils, getDescendants)
|
||||
{
|
||||
{
|
||||
HashMap<UInt64, PaddedPODArray<UInt64>> parent_to_child;
|
||||
DictionaryHierarchicalParentToChildIndex::ParentToChildIndex parent_to_child;
|
||||
parent_to_child[0].emplace_back(1);
|
||||
parent_to_child[1].emplace_back(2);
|
||||
parent_to_child[1].emplace_back(3);
|
||||
@ -221,7 +221,7 @@ TEST(HierarchyDictionariesUtils, getDescendants)
|
||||
}
|
||||
}
|
||||
{
|
||||
HashMap<UInt64, PaddedPODArray<UInt64>> parent_to_child;
|
||||
DictionaryHierarchicalParentToChildIndex::ParentToChildIndex parent_to_child;
|
||||
parent_to_child[1].emplace_back(2);
|
||||
parent_to_child[2].emplace_back(1);
|
||||
|
||||
|
@ -324,11 +324,11 @@ ColumnPtr Set::execute(const ColumnsWithTypeAndName & columns, bool negative) co
|
||||
|
||||
if (!transform_null_in && data_types[i]->canBeInsideNullable())
|
||||
{
|
||||
result = castColumnAccurateOrNull(column_to_cast, data_types[i]);
|
||||
result = castColumnAccurateOrNull(column_to_cast, data_types[i], cast_cache.get());
|
||||
}
|
||||
else
|
||||
{
|
||||
result = castColumnAccurate(column_to_cast, data_types[i]);
|
||||
result = castColumnAccurate(column_to_cast, data_types[i], cast_cache.get());
|
||||
}
|
||||
|
||||
materialized_columns.emplace_back() = result;
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Storages/MergeTree/BoolMask.h>
|
||||
|
||||
#include <Common/SharedMutex.h>
|
||||
#include <Interpreters/castColumn.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -33,9 +34,9 @@ public:
|
||||
/// This is needed for subsequent use for index.
|
||||
Set(const SizeLimits & limits_, size_t max_elements_to_fill_, bool transform_null_in_)
|
||||
: log(&Poco::Logger::get("Set")),
|
||||
limits(limits_), max_elements_to_fill(max_elements_to_fill_), transform_null_in(transform_null_in_)
|
||||
{
|
||||
}
|
||||
limits(limits_), max_elements_to_fill(max_elements_to_fill_), transform_null_in(transform_null_in_),
|
||||
cast_cache(std::make_unique<InternalCastFunctionCache>())
|
||||
{}
|
||||
|
||||
/** Set can be created either from AST or from a stream of data (subquery result).
|
||||
*/
|
||||
@ -142,6 +143,10 @@ private:
|
||||
*/
|
||||
mutable SharedMutex rwlock;
|
||||
|
||||
/// A cache for cast functions (if any) to avoid rebuilding cast functions
|
||||
/// for every call to `execute`
|
||||
mutable std::unique_ptr<InternalCastFunctionCache> cast_cache;
|
||||
|
||||
template <typename Method>
|
||||
void insertFromBlockImpl(
|
||||
Method & method,
|
||||
|
@ -7,24 +7,29 @@ namespace DB
|
||||
{
|
||||
|
||||
template <CastType cast_type = CastType::nonAccurate>
|
||||
static ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type)
|
||||
static ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, InternalCastFunctionCache * cache = nullptr)
|
||||
{
|
||||
if (arg.type->equals(*type) && cast_type != CastType::accurateOrNull)
|
||||
return arg.column;
|
||||
|
||||
const auto from_name = arg.type->getName();
|
||||
const auto to_name = type->getName();
|
||||
ColumnsWithTypeAndName arguments
|
||||
{
|
||||
arg,
|
||||
{
|
||||
DataTypeString().createColumnConst(arg.column->size(), type->getName()),
|
||||
DataTypeString().createColumnConst(arg.column->size(), to_name),
|
||||
std::make_shared<DataTypeString>(),
|
||||
""
|
||||
}
|
||||
};
|
||||
|
||||
auto get_cast_func = [&arguments]
|
||||
{
|
||||
FunctionOverloadResolverPtr func_builder_cast = CastInternalOverloadResolver<cast_type>::createImpl();
|
||||
return func_builder_cast->build(arguments);
|
||||
};
|
||||
|
||||
auto func_cast = func_builder_cast->build(arguments);
|
||||
FunctionBasePtr func_cast = cache ? cache->getOrSet(cast_type, from_name, to_name, std::move(get_cast_func)) : get_cast_func();
|
||||
|
||||
if constexpr (cast_type == CastType::accurateOrNull)
|
||||
{
|
||||
@ -36,19 +41,19 @@ static ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr
|
||||
}
|
||||
}
|
||||
|
||||
ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type)
|
||||
ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, InternalCastFunctionCache * cache)
|
||||
{
|
||||
return castColumn<CastType::nonAccurate>(arg, type);
|
||||
return castColumn<CastType::nonAccurate>(arg, type, cache);
|
||||
}
|
||||
|
||||
ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type)
|
||||
ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type, InternalCastFunctionCache * cache)
|
||||
{
|
||||
return castColumn<CastType::accurate>(arg, type);
|
||||
return castColumn<CastType::accurate>(arg, type, cache);
|
||||
}
|
||||
|
||||
ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type)
|
||||
ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type, InternalCastFunctionCache * cache)
|
||||
{
|
||||
return castColumn<CastType::accurateOrNull>(arg, type);
|
||||
return castColumn<CastType::accurateOrNull>(arg, type, cache);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,12 +1,34 @@
|
||||
#pragma once
|
||||
|
||||
#include <tuple>
|
||||
#include <Core/ColumnWithTypeAndName.h>
|
||||
#include <Functions/FunctionsConversion.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type);
|
||||
ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type);
|
||||
ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type);
|
||||
struct InternalCastFunctionCache
|
||||
{
|
||||
private:
|
||||
/// Maps <cast_type, from_type, to_type> -> cast functions
|
||||
/// Doesn't own key, never refer to key after inserted
|
||||
std::map<std::tuple<CastType, String, String>, FunctionBasePtr> impl;
|
||||
mutable std::mutex mutex;
|
||||
public:
|
||||
template<typename Getter>
|
||||
FunctionBasePtr getOrSet(CastType cast_type, const String & from, const String & to, Getter && getter)
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
auto key = std::forward_as_tuple(cast_type, from, to);
|
||||
auto it = impl.find(key);
|
||||
if (it == impl.end())
|
||||
it = impl.emplace(key, getter()).first;
|
||||
return it->second;
|
||||
}
|
||||
};
|
||||
|
||||
ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, InternalCastFunctionCache * cache = nullptr);
|
||||
ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type, InternalCastFunctionCache * cache = nullptr);
|
||||
ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type, InternalCastFunctionCache * cache = nullptr);
|
||||
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int SYNTAX_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -1342,6 +1343,7 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|
||||
ParserKeyword s_view("VIEW");
|
||||
ParserKeyword s_materialized("MATERIALIZED");
|
||||
ParserKeyword s_populate("POPULATE");
|
||||
ParserKeyword s_empty("EMPTY");
|
||||
ParserKeyword s_or_replace("OR REPLACE");
|
||||
ParserToken s_dot(TokenType::Dot);
|
||||
ParserToken s_lparen(TokenType::OpeningRoundBracket);
|
||||
@ -1437,8 +1439,26 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|
||||
|
||||
if (s_populate.ignore(pos, expected))
|
||||
is_populate = true;
|
||||
else if (ParserKeyword{"EMPTY"}.ignore(pos, expected))
|
||||
else if (s_empty.ignore(pos, expected))
|
||||
is_create_empty = true;
|
||||
|
||||
if (ParserKeyword{"TO"}.ignore(pos, expected))
|
||||
throw Exception(
|
||||
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'ENGINE' and 'TO [db].[table]'");
|
||||
}
|
||||
else
|
||||
{
|
||||
if (storage_p.ignore(pos, expected))
|
||||
throw Exception(
|
||||
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'TO [db].[table]' and 'ENGINE'");
|
||||
|
||||
if (s_populate.ignore(pos, expected))
|
||||
throw Exception(
|
||||
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'TO [db].[table]' and 'POPULATE'");
|
||||
|
||||
if (s_empty.ignore(pos, expected))
|
||||
throw Exception(
|
||||
ErrorCodes::SYNTAX_ERROR, "When creating a materialized view you can't declare both 'TO [db].[table]' and 'EMPTY'");
|
||||
}
|
||||
|
||||
/// AS SELECT ...
|
||||
|
@ -160,9 +160,12 @@ static void insertNumber(IColumn & column, WhichDataType type, T value)
|
||||
}
|
||||
|
||||
template <typename DecimalType>
|
||||
static AvroDeserializer::DeserializeFn createDecimalDeserializeFn(const avro::NodePtr & root_node, const DataTypePtr & target_type)
|
||||
static AvroDeserializer::DeserializeFn createDecimalDeserializeFn(const avro::NodePtr & root_node, const DataTypePtr & target_type, bool is_fixed)
|
||||
{
|
||||
auto logical_type = root_node->logicalType();
|
||||
size_t fixed_size = 0;
|
||||
if (is_fixed)
|
||||
fixed_size = root_node->fixedSize();
|
||||
const auto & decimal_type = assert_cast<const DecimalType &>(*target_type);
|
||||
if (decimal_type.getScale() != static_cast<UInt32>(logical_type.scale()) || decimal_type.getPrecision() != static_cast<UInt32>(logical_type.precision()))
|
||||
throw Exception(
|
||||
@ -174,14 +177,18 @@ static AvroDeserializer::DeserializeFn createDecimalDeserializeFn(const avro::No
|
||||
decimal_type.getScale(),
|
||||
decimal_type.getPrecision());
|
||||
|
||||
return [tmp = std::string(), target_type](IColumn & column, avro::Decoder & decoder) mutable
|
||||
return [tmp = std::vector<uint8_t>(), target_type, fixed_size](IColumn & column, avro::Decoder & decoder) mutable
|
||||
{
|
||||
static constexpr size_t field_type_size = sizeof(typename DecimalType::FieldType);
|
||||
decoder.decodeString(tmp);
|
||||
if (tmp.size() > field_type_size)
|
||||
if (fixed_size)
|
||||
tmp = decoder.decodeFixed(fixed_size);
|
||||
else
|
||||
tmp = decoder.decodeBytes();
|
||||
|
||||
if (tmp.size() > field_type_size || tmp.empty())
|
||||
throw ParsingException(
|
||||
ErrorCodes::CANNOT_PARSE_UUID,
|
||||
"Cannot parse type {}, expected binary data with size equal to or less than {}, got {}",
|
||||
"Cannot parse type {}, expected non-empty binary data with size equal to or less than {}, got {}",
|
||||
target_type->getName(),
|
||||
field_type_size,
|
||||
tmp.size());
|
||||
@ -189,10 +196,12 @@ static AvroDeserializer::DeserializeFn createDecimalDeserializeFn(const avro::No
|
||||
{
|
||||
/// Extent value to required size by adding padding.
|
||||
/// Check if value is negative or positive.
|
||||
std::vector<uint8_t> padding;
|
||||
if (tmp[0] & 128)
|
||||
tmp = std::string(field_type_size - tmp.size(), 0xff) + tmp;
|
||||
padding = std::vector<uint8_t>(field_type_size - tmp.size(), 0xff);
|
||||
else
|
||||
tmp = std::string(field_type_size - tmp.size(), 0) + tmp;
|
||||
padding = std::vector<uint8_t>(field_type_size - tmp.size(), 0);
|
||||
tmp.insert(tmp.begin(), padding.begin(), padding.end());
|
||||
}
|
||||
|
||||
typename DecimalType::FieldType field;
|
||||
@ -282,15 +291,15 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro
|
||||
};
|
||||
}
|
||||
if (target.isDecimal32())
|
||||
return createDecimalDeserializeFn<DataTypeDecimal32>(root_node, target_type);
|
||||
return createDecimalDeserializeFn<DataTypeDecimal32>(root_node, target_type, false);
|
||||
if (target.isDecimal64())
|
||||
return createDecimalDeserializeFn<DataTypeDecimal64>(root_node, target_type);
|
||||
return createDecimalDeserializeFn<DataTypeDecimal64>(root_node, target_type, false);
|
||||
if (target.isDecimal128())
|
||||
return createDecimalDeserializeFn<DataTypeDecimal128>(root_node, target_type);
|
||||
return createDecimalDeserializeFn<DataTypeDecimal128>(root_node, target_type, false);
|
||||
if (target.isDecimal256())
|
||||
return createDecimalDeserializeFn<DataTypeDecimal256>(root_node, target_type);
|
||||
return createDecimalDeserializeFn<DataTypeDecimal256>(root_node, target_type, false);
|
||||
if (target.isDateTime64())
|
||||
return createDecimalDeserializeFn<DataTypeDateTime64>(root_node, target_type);
|
||||
return createDecimalDeserializeFn<DataTypeDateTime64>(root_node, target_type, false);
|
||||
break;
|
||||
case avro::AVRO_INT:
|
||||
if (target_type->isValueRepresentedByNumber())
|
||||
@ -515,6 +524,29 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro
|
||||
return true;
|
||||
};
|
||||
}
|
||||
if (target.isUUID())
|
||||
{
|
||||
return [tmp = std::vector<uint8_t>(), fixed_size](IColumn & column, avro::Decoder & decoder) mutable
|
||||
{
|
||||
decoder.decodeFixed(fixed_size, tmp);
|
||||
if (tmp.size() != 36)
|
||||
throw ParsingException(ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse UUID from type Fixed, because it's size ({}) is not equal to the size of UUID (36)", fixed_size);
|
||||
|
||||
const UUID uuid = parseUUID({reinterpret_cast<const UInt8 *>(tmp.data()), tmp.size()});
|
||||
assert_cast<DataTypeUUID::ColumnType &>(column).insertValue(uuid);
|
||||
return true;
|
||||
};
|
||||
}
|
||||
if (target.isDecimal32())
|
||||
return createDecimalDeserializeFn<DataTypeDecimal32>(root_node, target_type, true);
|
||||
if (target.isDecimal64())
|
||||
return createDecimalDeserializeFn<DataTypeDecimal64>(root_node, target_type, true);
|
||||
if (target.isDecimal128())
|
||||
return createDecimalDeserializeFn<DataTypeDecimal128>(root_node, target_type, true);
|
||||
if (target.isDecimal256())
|
||||
return createDecimalDeserializeFn<DataTypeDecimal256>(root_node, target_type, true);
|
||||
if (target.isDateTime64())
|
||||
return createDecimalDeserializeFn<DataTypeDateTime64>(root_node, target_type, true);
|
||||
break;
|
||||
}
|
||||
case avro::AVRO_SYMBOLIC:
|
||||
@ -1210,7 +1242,16 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node)
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "ClickHouse supports only 8 and 16-bit Enum.");
|
||||
}
|
||||
case avro::Type::AVRO_FIXED:
|
||||
{
|
||||
auto logical_type = node->logicalType();
|
||||
if (logical_type.type() == avro::LogicalType::UUID)
|
||||
return std::make_shared<DataTypeUUID>();
|
||||
|
||||
if (logical_type.type() == avro::LogicalType::DECIMAL)
|
||||
return createDecimal<DataTypeDecimal>(logical_type.precision(), logical_type.scale());
|
||||
|
||||
return std::make_shared<DataTypeFixedString>(node->fixedSize());
|
||||
}
|
||||
case avro::Type::AVRO_ARRAY:
|
||||
return std::make_shared<DataTypeArray>(avroNodeToDataType(node->leafAt(0)));
|
||||
case avro::Type::AVRO_NULL:
|
||||
|
@ -122,6 +122,8 @@ static DataTypePtr parseORCType(const orc::Type * orc_type, bool skip_columns_wi
|
||||
return std::make_shared<DataTypeDate32>();
|
||||
case orc::TypeKind::TIMESTAMP:
|
||||
return std::make_shared<DataTypeDateTime64>(9);
|
||||
case orc::TypeKind::TIMESTAMP_INSTANT:
|
||||
return std::make_shared<DataTypeDateTime64>(9, "UTC");
|
||||
case orc::TypeKind::VARCHAR:
|
||||
case orc::TypeKind::BINARY:
|
||||
case orc::TypeKind::STRING:
|
||||
@ -795,7 +797,8 @@ static ColumnWithTypeAndName readColumnFromORCColumn(
|
||||
return readColumnWithNumericData<Float64, orc::DoubleVectorBatch>(orc_column, orc_type, column_name);
|
||||
case orc::DATE:
|
||||
return readColumnWithDateData(orc_column, orc_type, column_name, type_hint);
|
||||
case orc::TIMESTAMP:
|
||||
case orc::TIMESTAMP: [[fallthrough]];
|
||||
case orc::TIMESTAMP_INSTANT:
|
||||
return readColumnWithTimestampData(orc_column, orc_type, column_name);
|
||||
case orc::DECIMAL: {
|
||||
auto interal_type = parseORCType(orc_type, false, skipped);
|
||||
|
@ -1066,7 +1066,7 @@ void WindowTransform::appendChunk(Chunk & chunk)
|
||||
auto columns = chunk.detachColumns();
|
||||
block.original_input_columns = columns;
|
||||
for (auto & column : columns)
|
||||
column = recursiveRemoveLowCardinality(std::move(column)->convertToFullColumnIfConst());
|
||||
column = recursiveRemoveLowCardinality(std::move(column)->convertToFullColumnIfConst()->convertToFullColumnIfSparse());
|
||||
block.input_columns = std::move(columns);
|
||||
|
||||
// Initialize output columns.
|
||||
|
@ -27,6 +27,12 @@ public:
|
||||
, base_configuration(configuration_)
|
||||
, log(&Poco::Logger::get(getName())) {} // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
|
||||
|
||||
template <class ...Args>
|
||||
static StoragePtr create(const Configuration & configuration_, ContextPtr context_, Args && ...args)
|
||||
{
|
||||
return std::make_shared<IStorageDataLake<Storage, Name, MetadataParser>>(configuration_, context_, std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
String getName() const override { return name; }
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
@ -109,8 +115,7 @@ static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args)
|
||||
if (configuration.format == "auto")
|
||||
configuration.format = "Parquet";
|
||||
|
||||
return std::make_shared<DataLake>(
|
||||
configuration, args.getContext(), args.table_id, args.columns, args.constraints,
|
||||
return DataLake::create(configuration, args.getContext(), args.table_id, args.columns, args.constraints,
|
||||
args.comment, getFormatSettings(args.getContext()));
|
||||
}
|
||||
|
||||
|
580
src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp
Normal file
580
src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp
Normal file
@ -0,0 +1,580 @@
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3 && USE_AVRO
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/DataTypeDateTime64.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <DataTypes/DataTypeFixedString.h>
|
||||
#include <DataTypes/DataTypeMap.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypeUUID.h>
|
||||
#include <DataTypes/DataTypesDecimal.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
|
||||
#include <Storages/DataLakes/Iceberg/IcebergMetadata.h>
|
||||
#include <Storages/DataLakes/S3MetadataReader.h>
|
||||
#include <Storages/StorageS3.h>
|
||||
|
||||
#include <Poco/JSON/Array.h>
|
||||
#include <Poco/JSON/Object.h>
|
||||
#include <Poco/JSON/Parser.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
IcebergMetadata::IcebergMetadata(
|
||||
const StorageS3::Configuration & configuration_,
|
||||
DB::ContextPtr context_,
|
||||
DB::Int32 metadata_version_,
|
||||
DB::Int32 format_version_,
|
||||
DB::String manifest_list_file_,
|
||||
DB::Int32 current_schema_id_,
|
||||
DB::NamesAndTypesList schema_)
|
||||
: WithContext(context_)
|
||||
, configuration(configuration_)
|
||||
, metadata_version(metadata_version_)
|
||||
, format_version(format_version_)
|
||||
, manifest_list_file(std::move(manifest_list_file_))
|
||||
, current_schema_id(current_schema_id_)
|
||||
, schema(std::move(schema_))
|
||||
, log(&Poco::Logger::get("IcebergMetadata"))
|
||||
{
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
enum class ManifestEntryStatus
|
||||
{
|
||||
EXISTING = 0,
|
||||
ADDED = 1,
|
||||
DELETED = 2,
|
||||
};
|
||||
|
||||
enum class DataFileContent
|
||||
{
|
||||
DATA = 0,
|
||||
POSITION_DELETES = 1,
|
||||
EQUALITY_DELETES = 2,
|
||||
};
|
||||
|
||||
/**
|
||||
* Iceberg supports the next data types (see https://iceberg.apache.org/spec/#schemas-and-data-types):
|
||||
* - Primitive types:
|
||||
* - boolean
|
||||
* - int
|
||||
* - long
|
||||
* - float
|
||||
* - double
|
||||
* - decimal(P, S)
|
||||
* - date
|
||||
* - time (time of day in microseconds since midnight)
|
||||
* - timestamp (in microseconds since 1970-01-01)
|
||||
* - timestamptz (timestamp with timezone, stores values in UTC timezone)
|
||||
* - string
|
||||
* - uuid
|
||||
* - fixed(L) (fixed-length byte array of length L)
|
||||
* - binary
|
||||
* - Complex types:
|
||||
* - struct(field1: Type1, field2: Type2, ...) (tuple of typed values)
|
||||
* - list(nested_type)
|
||||
* - map(Key, Value)
|
||||
*
|
||||
* Example of table schema in metadata:
|
||||
* {
|
||||
* "type" : "struct",
|
||||
* "schema-id" : 0,
|
||||
* "fields" : [
|
||||
* {
|
||||
* "id" : 1,
|
||||
* "name" : "id",
|
||||
* "required" : false,
|
||||
* "type" : "long"
|
||||
* },
|
||||
* {
|
||||
* "id" : 2,
|
||||
* "name" : "array",
|
||||
* "required" : false,
|
||||
* "type" : {
|
||||
* "type" : "list",
|
||||
* "element-id" : 5,
|
||||
* "element" : "int",
|
||||
* "element-required" : false
|
||||
* },
|
||||
* {
|
||||
* "id" : 3,
|
||||
* "name" : "data",
|
||||
* "required" : false,
|
||||
* "type" : "binary"
|
||||
* }
|
||||
* }
|
||||
*/
|
||||
|
||||
DataTypePtr getSimpleTypeByName(const String & type_name)
|
||||
{
|
||||
if (type_name == "boolean")
|
||||
return DataTypeFactory::instance().get("Bool");
|
||||
if (type_name == "int")
|
||||
return std::make_shared<DataTypeInt32>();
|
||||
if (type_name == "long")
|
||||
return std::make_shared<DataTypeInt64>();
|
||||
if (type_name == "float")
|
||||
return std::make_shared<DataTypeFloat32>();
|
||||
if (type_name == "double")
|
||||
return std::make_shared<DataTypeFloat64>();
|
||||
if (type_name == "date")
|
||||
return std::make_shared<DataTypeDate>();
|
||||
/// Time type represents time of the day in microseconds since midnight.
|
||||
/// We don't have similar type for it, let's use just Int64.
|
||||
if (type_name == "time")
|
||||
return std::make_shared<DataTypeInt64>();
|
||||
if (type_name == "timestamp")
|
||||
return std::make_shared<DataTypeDateTime64>(6);
|
||||
if (type_name == "timestamptz")
|
||||
return std::make_shared<DataTypeDateTime64>(6, "UTC");
|
||||
if (type_name == "string" || type_name == "binary")
|
||||
return std::make_shared<DataTypeString>();
|
||||
if (type_name == "uuid")
|
||||
return std::make_shared<DataTypeUUID>();
|
||||
|
||||
if (type_name.starts_with("fixed[") && type_name.ends_with(']'))
|
||||
{
|
||||
ReadBufferFromString buf(std::string_view(type_name.begin() + 6, type_name.end() - 1));
|
||||
size_t n;
|
||||
readIntText(n, buf);
|
||||
return std::make_shared<DataTypeFixedString>(n);
|
||||
}
|
||||
|
||||
if (type_name.starts_with("decimal(") && type_name.ends_with(')'))
|
||||
{
|
||||
ReadBufferFromString buf(std::string_view(type_name.begin() + 8, type_name.end() - 1));
|
||||
size_t precision;
|
||||
size_t scale;
|
||||
readIntText(precision, buf);
|
||||
skipWhitespaceIfAny(buf);
|
||||
assertChar(',', buf);
|
||||
skipWhitespaceIfAny(buf);
|
||||
tryReadIntText(scale, buf);
|
||||
return createDecimal<DataTypeDecimal>(precision, scale);
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name);
|
||||
}
|
||||
|
||||
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required);
|
||||
|
||||
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
|
||||
{
|
||||
String type_name = type->getValue<String>("type");
|
||||
if (type_name == "list")
|
||||
{
|
||||
bool element_required = type->getValue<bool>("element-required");
|
||||
auto element_type = getFieldType(type, "element", element_required);
|
||||
return std::make_shared<DataTypeArray>(element_type);
|
||||
}
|
||||
|
||||
if (type_name == "map")
|
||||
{
|
||||
auto key_type = getFieldType(type, "key", true);
|
||||
auto value_required = type->getValue<bool>("value-required");
|
||||
auto value_type = getFieldType(type, "value", value_required);
|
||||
return std::make_shared<DataTypeMap>(key_type, value_type);
|
||||
}
|
||||
|
||||
if (type_name == "struct")
|
||||
{
|
||||
DataTypes element_types;
|
||||
Names element_names;
|
||||
auto fields = type->get("fields").extract<Poco::JSON::Array::Ptr>();
|
||||
element_types.reserve(fields->size());
|
||||
element_names.reserve(fields->size());
|
||||
for (size_t i = 0; i != fields->size(); ++i)
|
||||
{
|
||||
auto field = fields->getObject(static_cast<Int32>(i));
|
||||
element_names.push_back(field->getValue<String>("name"));
|
||||
auto required = field->getValue<bool>("required");
|
||||
element_types.push_back(getFieldType(field, "type", required));
|
||||
}
|
||||
|
||||
return std::make_shared<DataTypeTuple>(element_types, element_names);
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name);
|
||||
}
|
||||
|
||||
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required)
|
||||
{
|
||||
if (field->isObject(type_key))
|
||||
return getComplexTypeFromObject(field->getObject(type_key));
|
||||
|
||||
auto type = field->get(type_key);
|
||||
if (type.isString())
|
||||
{
|
||||
const String & type_name = type.extract<String>();
|
||||
auto data_type = getSimpleTypeByName(type_name);
|
||||
return required ? data_type : makeNullable(data_type);
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString());
|
||||
|
||||
}
|
||||
|
||||
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version)
|
||||
{
|
||||
Poco::JSON::Object::Ptr schema;
|
||||
Int32 current_schema_id;
|
||||
|
||||
/// First check if schema was evolved, because we don't support it yet.
|
||||
/// For version 2 we can check it by using field schemas, but in version 1
|
||||
/// this field is optional and we will check it later during parsing manifest files
|
||||
/// (we will compare schema id from manifest file and currently used schema).
|
||||
if (format_version == 2)
|
||||
{
|
||||
current_schema_id = metadata_object->getValue<int>("current-schema-id");
|
||||
auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
|
||||
if (schemas->size() != 1)
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
|
||||
|
||||
/// Now we sure that there is only one schema.
|
||||
schema = schemas->getObject(0);
|
||||
if (schema->getValue<int>("schema-id") != current_schema_id)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)");
|
||||
}
|
||||
else
|
||||
{
|
||||
schema = metadata_object->getObject("schema");
|
||||
current_schema_id = schema->getValue<int>("schema-id");
|
||||
/// Field "schemas" is optional for version 1, but after version 2 was introduced,
|
||||
/// in most cases this field is added for new tables in version 1 as well.
|
||||
if (metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
|
||||
}
|
||||
|
||||
NamesAndTypesList names_and_types;
|
||||
auto fields = schema->get("fields").extract<Poco::JSON::Array::Ptr>();
|
||||
for (size_t i = 0; i != fields->size(); ++i)
|
||||
{
|
||||
auto field = fields->getObject(static_cast<UInt32>(i));
|
||||
auto name = field->getValue<String>("name");
|
||||
bool required = field->getValue<bool>("required");
|
||||
names_and_types.push_back({name, getFieldType(field, "type", required)});
|
||||
}
|
||||
|
||||
return {std::move(names_and_types), current_schema_id};
|
||||
}
|
||||
|
||||
MutableColumns parseAvro(
|
||||
avro::DataFileReaderBase & file_reader,
|
||||
const Block & header,
|
||||
const FormatSettings & settings)
|
||||
{
|
||||
auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
|
||||
MutableColumns columns = header.cloneEmptyColumns();
|
||||
|
||||
file_reader.init();
|
||||
RowReadExtension ext;
|
||||
while (file_reader.hasMore())
|
||||
{
|
||||
file_reader.decr();
|
||||
deserializer->deserializeRow(columns, file_reader.decoder(), ext);
|
||||
}
|
||||
return columns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Each version of table metadata is stored in a `metadata` directory and
|
||||
* has format: v<V>.metadata.json, where V - metadata version.
|
||||
*/
|
||||
std::pair<Int32, String> getMetadataFileAndVersion(const StorageS3::Configuration & configuration)
|
||||
{
|
||||
const auto metadata_files = S3DataLakeMetadataReadHelper::listFiles(configuration, "metadata", ".metadata.json");
|
||||
if (metadata_files.empty())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::FILE_DOESNT_EXIST,
|
||||
"The metadata file for Iceberg table with path {} doesn't exist",
|
||||
configuration.url.key);
|
||||
}
|
||||
|
||||
std::vector<std::pair<UInt32, String>> metadata_files_with_versions;
|
||||
metadata_files_with_versions.reserve(metadata_files.size());
|
||||
for (const auto & path : metadata_files)
|
||||
{
|
||||
String file_name(path.begin() + path.find_last_of('/') + 1, path.end());
|
||||
String version_str(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.'));
|
||||
if (!std::all_of(version_str.begin(), version_str.end(), isdigit))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
|
||||
metadata_files_with_versions.emplace_back(std::stoi(version_str), path);
|
||||
}
|
||||
|
||||
/// Get the latest version of metadata file: v<V>.metadata.json
|
||||
return *std::max_element(metadata_files_with_versions.begin(), metadata_files_with_versions.end());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
std::unique_ptr<IcebergMetadata> parseIcebergMetadata(const StorageS3::Configuration & configuration, ContextPtr context_)
|
||||
{
|
||||
const auto [metadata_version, metadata_file_path] = getMetadataFileAndVersion(configuration);
|
||||
LOG_DEBUG(&Poco::Logger::get("IcebergMetadata"), "Parse metadata {}", metadata_file_path);
|
||||
auto buf = S3DataLakeMetadataReadHelper::createReadBuffer(metadata_file_path, context_, configuration);
|
||||
String json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, *buf);
|
||||
|
||||
Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file
|
||||
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
auto format_version = object->getValue<int>("format-version");
|
||||
auto [schema, schema_id] = parseTableSchema(object, format_version);
|
||||
|
||||
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
|
||||
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
|
||||
|
||||
String manifest_list_file;
|
||||
for (size_t i = 0; i < snapshots->size(); ++i)
|
||||
{
|
||||
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
|
||||
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
|
||||
{
|
||||
const auto path = snapshot->getValue<String>("manifest-list");
|
||||
manifest_list_file = std::filesystem::path(configuration.url.key) / "metadata" / std::filesystem::path(path).filename();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_unique<IcebergMetadata>(configuration, context_, metadata_version, format_version, manifest_list_file, schema_id, schema);
|
||||
}
|
||||
|
||||
/**
|
||||
* Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro'
|
||||
*
|
||||
* `manifest file` is different in format version V1 and V2 and has the following contents:
|
||||
* v1 v2
|
||||
* status req req
|
||||
* snapshot_id req opt
|
||||
* sequence_number opt
|
||||
* file_sequence_number opt
|
||||
* data_file req req
|
||||
* Example format version V1:
|
||||
* ┌─status─┬─────────snapshot_id─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||
* │ 1 │ 2819310504515118887 │ ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0) │
|
||||
* └────────┴─────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
* Example format version V2:
|
||||
* ┌─status─┬─────────snapshot_id─┬─sequence_number─┬─file_sequence_number─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||
* │ 1 │ 5887006101709926452 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0) │
|
||||
* └────────┴─────────────────────┴─────────────────┴──────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
* In case of partitioned data we'll have extra directory partition=value:
|
||||
* ─status─┬─────────snapshot_id─┬─data_file──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=0/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00001.parquet','PARQUET',(0),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],NULL,[4],0) │
|
||||
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=1/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00002.parquet','PARQUET',(1),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'2')],[(1,'\0\0\0\0\0\0\0'),(2,'2')],NULL,[4],0) │
|
||||
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=2/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00003.parquet','PARQUET',(2),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'3')],[(1,'\0\0\0\0\0\0\0'),(2,'3')],NULL,[4],0) │
|
||||
* └────────┴─────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
*/
|
||||
Strings IcebergMetadata::getDataFiles()
|
||||
{
|
||||
if (!data_files.empty())
|
||||
return data_files;
|
||||
|
||||
Strings manifest_files;
|
||||
if (manifest_list_file.empty())
|
||||
return data_files;
|
||||
|
||||
LOG_TEST(log, "Collect manifest files from manifest list {}", manifest_list_file);
|
||||
|
||||
auto manifest_list_buf = S3DataLakeMetadataReadHelper::createReadBuffer(manifest_list_file, getContext(), configuration);
|
||||
auto manifest_list_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
|
||||
|
||||
auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0));
|
||||
Block header{{data_type->createColumn(), data_type, "manifest_path"}};
|
||||
auto columns = parseAvro(*manifest_list_file_reader, header, getFormatSettings(getContext()));
|
||||
auto & col = columns.at(0);
|
||||
|
||||
if (col->getDataType() != TypeIndex::String)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `manifest_path` field should be String type, got {}",
|
||||
col->getFamilyName());
|
||||
}
|
||||
|
||||
const auto * col_str = typeid_cast<ColumnString *>(col.get());
|
||||
for (size_t i = 0; i < col_str->size(); ++i)
|
||||
{
|
||||
const auto file_path = col_str->getDataAt(i).toView();
|
||||
const auto filename = std::filesystem::path(file_path).filename();
|
||||
manifest_files.emplace_back(std::filesystem::path(configuration.url.key) / "metadata" / filename);
|
||||
}
|
||||
|
||||
NameSet files;
|
||||
LOG_TEST(log, "Collect data files");
|
||||
for (const auto & manifest_file : manifest_files)
|
||||
{
|
||||
LOG_TEST(log, "Process manifest file {}", manifest_file);
|
||||
|
||||
auto buffer = S3DataLakeMetadataReadHelper::createReadBuffer(manifest_file, getContext(), configuration);
|
||||
auto manifest_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
|
||||
|
||||
/// Manifest file should always have table schema in avro file metadata. By now we don't support tables with evolved schema,
|
||||
/// so we should check if all manifest files have the same schema as in table metadata.
|
||||
auto avro_metadata = manifest_file_reader->metadata();
|
||||
std::vector<uint8_t> schema_json = avro_metadata["schema"];
|
||||
String schema_json_string = String(reinterpret_cast<char *>(schema_json.data()), schema_json.size());
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var json = parser.parse(schema_json_string);
|
||||
Poco::JSON::Object::Ptr schema_object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
if (schema_object->getValue<int>("schema-id") != current_schema_id)
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
|
||||
|
||||
avro::NodePtr root_node = manifest_file_reader->dataSchema().root();
|
||||
size_t leaves_num = root_node->leaves();
|
||||
size_t expected_min_num = format_version == 1 ? 3 : 2;
|
||||
if (leaves_num < expected_min_num)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Unexpected number of columns {}. Expected at least {}",
|
||||
root_node->leaves(), expected_min_num);
|
||||
}
|
||||
|
||||
avro::NodePtr status_node = root_node->leafAt(0);
|
||||
if (status_node->type() != avro::Type::AVRO_INT)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `status` field should be Int type, got {}",
|
||||
magic_enum::enum_name(status_node->type()));
|
||||
}
|
||||
|
||||
avro::NodePtr data_file_node = root_node->leafAt(static_cast<int>(leaves_num) - 1);
|
||||
if (data_file_node->type() != avro::Type::AVRO_RECORD)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
|
||||
magic_enum::enum_name(data_file_node->type()));
|
||||
}
|
||||
|
||||
auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node);
|
||||
auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
|
||||
Block manifest_file_header
|
||||
= {{status_col_data_type->createColumn(), status_col_data_type, "status"},
|
||||
{data_col_data_type->createColumn(), data_col_data_type, "data_file"}};
|
||||
|
||||
columns = parseAvro(*manifest_file_reader, manifest_file_header, getFormatSettings(getContext()));
|
||||
if (columns.size() != 2)
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Unexpected number of columns. Expected 2, got {}", columns.size());
|
||||
|
||||
if (columns.at(0)->getDataType() != TypeIndex::Int32)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `status` field should be Int32 type, got {}",
|
||||
columns.at(0)->getFamilyName());
|
||||
}
|
||||
if (columns.at(1)->getDataType() != TypeIndex::Tuple)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `file_path` field should be Tuple type, got {}",
|
||||
columns.at(1)->getFamilyName());
|
||||
}
|
||||
|
||||
const auto * status_int_column = assert_cast<ColumnInt32 *>(columns.at(0).get());
|
||||
const auto & data_file_tuple_type = assert_cast<const DataTypeTuple &>(*data_col_data_type.get());
|
||||
const auto * data_file_tuple_column = assert_cast<ColumnTuple *>(columns.at(1).get());
|
||||
|
||||
if (status_int_column->size() != data_file_tuple_column->size())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `file_path` and `status` have different rows number: {} and {}",
|
||||
status_int_column->size(),
|
||||
data_file_tuple_column->size());
|
||||
}
|
||||
|
||||
ColumnPtr file_path_column = data_file_tuple_column->getColumnPtr(data_file_tuple_type.getPositionByName("file_path"));
|
||||
|
||||
if (file_path_column->getDataType() != TypeIndex::String)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `file_path` field should be String type, got {}",
|
||||
file_path_column->getFamilyName());
|
||||
}
|
||||
|
||||
const auto * file_path_string_column = assert_cast<const ColumnString *>(file_path_column.get());
|
||||
|
||||
ColumnPtr content_column;
|
||||
const ColumnInt32 * content_int_column = nullptr;
|
||||
if (format_version == 2)
|
||||
{
|
||||
content_column = data_file_tuple_column->getColumnPtr(data_file_tuple_type.getPositionByName("content"));
|
||||
if (content_column->getDataType() != TypeIndex::Int32)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `content` field should be Int type, got {}",
|
||||
content_column->getFamilyName());
|
||||
}
|
||||
|
||||
content_int_column = assert_cast<const ColumnInt32 *>(content_column.get());
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < data_file_tuple_column->size(); ++i)
|
||||
{
|
||||
if (format_version == 2)
|
||||
{
|
||||
Int32 content_type = content_int_column->getElement(i);
|
||||
if (DataFileContent(content_type) != DataFileContent::DATA)
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
|
||||
}
|
||||
|
||||
const auto status = status_int_column->getInt(i);
|
||||
const auto data_path = std::string(file_path_string_column->getDataAt(i).toView());
|
||||
const auto pos = data_path.find(configuration.url.key);
|
||||
const auto file_path = data_path.substr(pos);
|
||||
if (pos == std::string::npos)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration.url.key, data_path);
|
||||
|
||||
if (ManifestEntryStatus(status) == ManifestEntryStatus::DELETED)
|
||||
{
|
||||
LOG_TEST(log, "Processing delete file for path: {}", file_path);
|
||||
chassert(!files.contains(file_path));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TEST(log, "Processing data file for path: {}", file_path);
|
||||
files.insert(file_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data_files = std::vector<std::string>(files.begin(), files.end());
|
||||
return data_files;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
95
src/Storages/DataLakes/Iceberg/IcebergMetadata.h
Normal file
95
src/Storages/DataLakes/Iceberg/IcebergMetadata.h
Normal file
@ -0,0 +1,95 @@
|
||||
#pragma once
|
||||
|
||||
#if USE_AWS_S3 && USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
|
||||
|
||||
#include <Storages/StorageS3.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Core/Types.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/**
|
||||
* Useful links:
|
||||
* - https://iceberg.apache.org/spec/
|
||||
*
|
||||
* Iceberg has two format versions, v1 and v2. The content of metadata files depends on the version.
|
||||
*
|
||||
* Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`.
|
||||
* Metadata file - json file.
|
||||
* Manifest list – an Avro file that lists manifest files; one per snapshot.
|
||||
* Manifest file – an Avro file that lists data or delete files; a subset of a snapshot.
|
||||
* All changes to table state create a new metadata file and replace the old metadata with an atomic swap.
|
||||
*
|
||||
* In order to find out which data files to read, we need to find the `manifest list`
|
||||
* which corresponds to the latest snapshot. We find it by checking a list of snapshots
|
||||
* in metadata's "snapshots" section.
|
||||
*
|
||||
* Example of metadata.json file.
|
||||
* {
|
||||
* "format-version" : 1,
|
||||
* "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5",
|
||||
* "location" : "/iceberg_data/db/table_name",
|
||||
* "last-updated-ms" : 1680206743150,
|
||||
* "last-column-id" : 2,
|
||||
* "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {<field1_info>}, {<field2_info>}, ... ] },
|
||||
* "current-schema-id" : 0,
|
||||
* "schemas" : [ ],
|
||||
* ...
|
||||
* "current-snapshot-id" : 2819310504515118887,
|
||||
* "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } },
|
||||
* "snapshots" : [ {
|
||||
* "snapshot-id" : 2819310504515118887,
|
||||
* "timestamp-ms" : 1680206743150,
|
||||
* "summary" : {
|
||||
* "operation" : "append", "spark.app.id" : "local-1680206733239",
|
||||
* "added-data-files" : "1", "added-records" : "100",
|
||||
* "added-files-size" : "1070", "changed-partition-count" : "1",
|
||||
* "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0",
|
||||
* "total-position-deletes" : "0", "total-equality-deletes" : "0"
|
||||
* },
|
||||
* "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro",
|
||||
* "schema-id" : 0
|
||||
* } ],
|
||||
* "statistics" : [ ],
|
||||
* "snapshot-log" : [ ... ],
|
||||
* "metadata-log" : [ ]
|
||||
* }
|
||||
*/
|
||||
class IcebergMetadata : WithContext
|
||||
{
|
||||
public:
|
||||
IcebergMetadata(const StorageS3::Configuration & configuration_,
|
||||
ContextPtr context_,
|
||||
Int32 metadata_version_,
|
||||
Int32 format_version_,
|
||||
String manifest_list_file_,
|
||||
Int32 current_schema_id_,
|
||||
NamesAndTypesList schema_);
|
||||
|
||||
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
|
||||
/// All subsequent calls will return saved list of files (because it cannot be changed without changing metadata file)
|
||||
Strings getDataFiles();
|
||||
|
||||
/// Get table schema parsed from metadata.
|
||||
NamesAndTypesList getTableSchema() const { return schema; }
|
||||
|
||||
size_t getVersion() const { return metadata_version; }
|
||||
|
||||
private:
|
||||
const StorageS3::Configuration configuration;
|
||||
Int32 metadata_version;
|
||||
Int32 format_version;
|
||||
String manifest_list_file;
|
||||
Int32 current_schema_id;
|
||||
NamesAndTypesList schema;
|
||||
Strings data_files;
|
||||
Poco::Logger * log;
|
||||
|
||||
};
|
||||
|
||||
std::unique_ptr<IcebergMetadata> parseIcebergMetadata(const StorageS3::Configuration & configuration, ContextPtr context);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
70
src/Storages/DataLakes/Iceberg/StorageIceberg.cpp
Normal file
70
src/Storages/DataLakes/Iceberg/StorageIceberg.cpp
Normal file
@ -0,0 +1,70 @@
|
||||
#include <Storages/DataLakes/Iceberg/StorageIceberg.h>
|
||||
|
||||
#if USE_AWS_S3 && USE_AVRO
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
StoragePtr StorageIceberg::create(
|
||||
const DB::StorageIceberg::Configuration & base_configuration,
|
||||
DB::ContextPtr context_,
|
||||
const DB::StorageID & table_id_,
|
||||
const DB::ColumnsDescription & columns_,
|
||||
const DB::ConstraintsDescription & constraints_,
|
||||
const DB::String & comment,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
{
|
||||
auto configuration{base_configuration};
|
||||
configuration.update(context_);
|
||||
auto metadata = parseIcebergMetadata(configuration, context_);
|
||||
auto schema_from_metadata = metadata->getTableSchema();
|
||||
configuration.keys = metadata->getDataFiles();
|
||||
return std::make_shared<StorageIceberg>(std::move(metadata), configuration, context_, table_id_, columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_, constraints_, comment, format_settings_);
|
||||
}
|
||||
|
||||
StorageIceberg::StorageIceberg(
|
||||
std::unique_ptr<IcebergMetadata> metadata_,
|
||||
const Configuration & configuration_,
|
||||
ContextPtr context_,
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: StorageS3(configuration_, context_, table_id_, columns_, constraints_, comment, format_settings_)
|
||||
, current_metadata(std::move(metadata_))
|
||||
, base_configuration(configuration_)
|
||||
{
|
||||
}
|
||||
|
||||
ColumnsDescription StorageIceberg::getTableStructureFromData(
|
||||
Configuration & base_configuration,
|
||||
const std::optional<FormatSettings> &,
|
||||
ContextPtr local_context)
|
||||
{
|
||||
auto configuration{base_configuration};
|
||||
configuration.update(local_context);
|
||||
auto metadata = parseIcebergMetadata(configuration, local_context);
|
||||
return ColumnsDescription(metadata->getTableSchema());
|
||||
}
|
||||
|
||||
void StorageIceberg::updateConfigurationImpl(ContextPtr local_context)
|
||||
{
|
||||
const bool updated = base_configuration.update(local_context);
|
||||
auto new_metadata = parseIcebergMetadata(base_configuration, local_context);
|
||||
/// Check if nothing was changed.
|
||||
if (updated && new_metadata->getVersion() == current_metadata->getVersion())
|
||||
return;
|
||||
|
||||
if (new_metadata->getVersion() != current_metadata->getVersion())
|
||||
current_metadata = std::move(new_metadata);
|
||||
|
||||
auto updated_configuration{base_configuration};
|
||||
/// If metadata wasn't changed, we won't list data files again.
|
||||
updated_configuration.keys = current_metadata->getDataFiles();
|
||||
StorageS3::useConfiguration(updated_configuration);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
84
src/Storages/DataLakes/Iceberg/StorageIceberg.h
Normal file
84
src/Storages/DataLakes/Iceberg/StorageIceberg.h
Normal file
@ -0,0 +1,84 @@
|
||||
#pragma once
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3 && USE_AVRO
|
||||
|
||||
# include <filesystem>
|
||||
# include <Formats/FormatFactory.h>
|
||||
# include <Storages/DataLakes/Iceberg/IcebergMetadata.h>
|
||||
# include <Storages/IStorage.h>
|
||||
# include <Storages/StorageFactory.h>
|
||||
# include <Storages/StorageS3.h>
|
||||
# include <Common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Storage for read-only integration with Apache Iceberg tables in Amazon S3 (see https://iceberg.apache.org/)
|
||||
/// Right now it's implemented on top of StorageS3 and right now it doesn't support
|
||||
/// many Iceberg features like schema evolution, partitioning, positional and equality deletes.
|
||||
/// TODO: Implement Iceberg as a separate storage using IObjectStorage
|
||||
/// (to support all object storages, not only S3) and add support for missing Iceberg features.
|
||||
class StorageIceberg : public StorageS3
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = "Iceberg";
|
||||
|
||||
using Configuration = StorageS3::Configuration;
|
||||
|
||||
static StoragePtr create(const Configuration & base_configuration,
|
||||
ContextPtr context_,
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
std::optional<FormatSettings> format_settings_);
|
||||
|
||||
StorageIceberg(
|
||||
std::unique_ptr<IcebergMetadata> metadata_,
|
||||
const Configuration & configuration_,
|
||||
ContextPtr context_,
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
std::optional<FormatSettings> format_settings_);
|
||||
|
||||
String getName() const override { return name; }
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
Configuration & base_configuration,
|
||||
const std::optional<FormatSettings> &,
|
||||
ContextPtr local_context);
|
||||
|
||||
static Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context)
|
||||
{
|
||||
return StorageS3::getConfiguration(engine_args, local_context, /* get_format_from_file */false);
|
||||
}
|
||||
|
||||
Configuration updateConfigurationAndGetCopy(ContextPtr local_context) override
|
||||
{
|
||||
std::lock_guard lock(configuration_update_mutex);
|
||||
updateConfigurationImpl(local_context);
|
||||
return StorageS3::getConfiguration();
|
||||
}
|
||||
|
||||
void updateConfiguration(ContextPtr local_context) override
|
||||
{
|
||||
std::lock_guard lock(configuration_update_mutex);
|
||||
updateConfigurationImpl(local_context);
|
||||
}
|
||||
|
||||
private:
|
||||
void updateConfigurationImpl(ContextPtr local_context);
|
||||
|
||||
std::unique_ptr<IcebergMetadata> current_metadata;
|
||||
Configuration base_configuration;
|
||||
std::mutex configuration_update_mutex;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -1,361 +0,0 @@
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3 && USE_AVRO
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <Storages/DataLakes/IcebergMetadataParser.h>
|
||||
#include <Storages/DataLakes/S3MetadataReader.h>
|
||||
#include <Storages/StorageS3.h>
|
||||
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
#include <Poco/JSON/Array.h>
|
||||
#include <Poco/JSON/Object.h>
|
||||
#include <Poco/JSON/Parser.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
template <typename Configuration, typename MetadataReadHelper>
|
||||
struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
{
|
||||
Poco::Logger * log = &Poco::Logger::get("IcebergMetadataParser");
|
||||
|
||||
/**
|
||||
* Useful links:
|
||||
* - https://iceberg.apache.org/spec/
|
||||
*/
|
||||
|
||||
/**
|
||||
* Iceberg has two format versions, currently we support only format V1.
|
||||
*
|
||||
* Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`.
|
||||
* Metadata file - json file.
|
||||
* Manifest list – a file that lists manifest files; one per snapshot.
|
||||
* Manifest file – a file that lists data or delete files; a subset of a snapshot.
|
||||
* All changes to table state create a new metadata file and replace the old metadata with an atomic swap.
|
||||
*/
|
||||
|
||||
static constexpr auto metadata_directory = "metadata";
|
||||
|
||||
/**
|
||||
* Each version of table metadata is stored in a `metadata` directory and
|
||||
* has format: v<V>.metadata.json, where V - metadata version.
|
||||
*/
|
||||
String getMetadataFile(const Configuration & configuration)
|
||||
{
|
||||
static constexpr auto metadata_file_suffix = ".metadata.json";
|
||||
|
||||
const auto metadata_files = MetadataReadHelper::listFiles(configuration, metadata_directory, metadata_file_suffix);
|
||||
if (metadata_files.empty())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::FILE_DOESNT_EXIST,
|
||||
"The metadata file for Iceberg table with path {} doesn't exist",
|
||||
configuration.url.key);
|
||||
}
|
||||
|
||||
/// Get the latest version of metadata file: v<V>.metadata.json
|
||||
return *std::max_element(metadata_files.begin(), metadata_files.end());
|
||||
}
|
||||
|
||||
/**
|
||||
* In order to find out which data files to read, we need to find the `manifest list`
|
||||
* which corresponds to the latest snapshot. We find it by checking a list of snapshots
|
||||
* in metadata's "snapshots" section.
|
||||
*
|
||||
* Example of metadata.json file.
|
||||
*
|
||||
* {
|
||||
* "format-version" : 1,
|
||||
* "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5",
|
||||
* "location" : "/iceberg_data/db/table_name",
|
||||
* "last-updated-ms" : 1680206743150,
|
||||
* "last-column-id" : 2,
|
||||
* "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {<field1_info>}, {<field2_info>}, ... ] },
|
||||
* "current-schema-id" : 0,
|
||||
* "schemas" : [ ],
|
||||
* ...
|
||||
* "current-snapshot-id" : 2819310504515118887,
|
||||
* "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } },
|
||||
* "snapshots" : [ {
|
||||
* "snapshot-id" : 2819310504515118887,
|
||||
* "timestamp-ms" : 1680206743150,
|
||||
* "summary" : {
|
||||
* "operation" : "append", "spark.app.id" : "local-1680206733239",
|
||||
* "added-data-files" : "1", "added-records" : "100",
|
||||
* "added-files-size" : "1070", "changed-partition-count" : "1",
|
||||
* "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0",
|
||||
* "total-position-deletes" : "0", "total-equality-deletes" : "0"
|
||||
* },
|
||||
* "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro",
|
||||
* "schema-id" : 0
|
||||
* } ],
|
||||
* "statistics" : [ ],
|
||||
* "snapshot-log" : [ ... ],
|
||||
* "metadata-log" : [ ]
|
||||
* }
|
||||
*/
|
||||
struct Metadata
|
||||
{
|
||||
int format_version;
|
||||
String manifest_list;
|
||||
Strings manifest_files;
|
||||
};
|
||||
Metadata processMetadataFile(const Configuration & configuration, ContextPtr context)
|
||||
{
|
||||
const auto metadata_file_path = getMetadataFile(configuration);
|
||||
auto buf = MetadataReadHelper::createReadBuffer(metadata_file_path, context, configuration);
|
||||
String json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, *buf);
|
||||
|
||||
Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file
|
||||
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
Metadata result;
|
||||
result.format_version = object->getValue<int>("format-version");
|
||||
|
||||
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
|
||||
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
|
||||
|
||||
for (size_t i = 0; i < snapshots->size(); ++i)
|
||||
{
|
||||
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
|
||||
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
|
||||
{
|
||||
const auto path = snapshot->getValue<String>("manifest-list");
|
||||
result.manifest_list = std::filesystem::path(configuration.url.key) / metadata_directory / std::filesystem::path(path).filename();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Manifest list has Avro as default format (and currently we support only Avro).
|
||||
* Manifest list file format of manifest list is: snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro
|
||||
*
|
||||
* `manifest list` has the following contents:
|
||||
* ┌─manifest_path────────────────────────────────────────────────────────────────────────────────────────┬─manifest_length─┬─partition_spec_id─┬───added_snapshot_id─┬─added_data_files_count─┬─existing_data_files_count─┬─deleted_data_files_count─┬─partitions─┬─added_rows_count─┬─existing_rows_count─┬─deleted_rows_count─┐
|
||||
* │ /iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro │ 5813 │ 0 │ 2819310504515118887 │ 1 │ 0 │ 0 │ [] │ 100 │ 0 │ 0 │
|
||||
* └──────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────┴───────────────────┴─────────────────────┴────────────────────────┴───────────────────────────┴──────────────────────────┴────────────┴──────────────────┴─────────────────────┴────────────────────┘
|
||||
*/
|
||||
void processManifestList(Metadata & metadata, const Configuration & configuration, ContextPtr context)
|
||||
{
|
||||
auto buf = MetadataReadHelper::createReadBuffer(metadata.manifest_list, context, configuration);
|
||||
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buf));
|
||||
|
||||
auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0));
|
||||
Block header{{data_type->createColumn(), data_type, "manifest_path"}};
|
||||
auto columns = parseAvro(*file_reader, header, getFormatSettings(context));
|
||||
auto & col = columns.at(0);
|
||||
|
||||
if (col->getDataType() != TypeIndex::String)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `manifest_path` field should be String type, got {}",
|
||||
col->getFamilyName());
|
||||
}
|
||||
|
||||
const auto * col_str = typeid_cast<ColumnString *>(col.get());
|
||||
for (size_t i = 0; i < col_str->size(); ++i)
|
||||
{
|
||||
const auto file_path = col_str->getDataAt(i).toView();
|
||||
const auto filename = std::filesystem::path(file_path).filename();
|
||||
metadata.manifest_files.emplace_back(std::filesystem::path(configuration.url.key) / metadata_directory / filename);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro'
|
||||
*
|
||||
* `manifest file` is different in format version V1 and V2 and has the following contents:
|
||||
* v1 v2
|
||||
* status req req
|
||||
* snapshot_id req opt
|
||||
* sequence_number opt
|
||||
* file_sequence_number opt
|
||||
* data_file req req
|
||||
* Example format version V1:
|
||||
* ┌─status─┬─────────snapshot_id─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||
* │ 1 │ 2819310504515118887 │ ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0) │
|
||||
* └────────┴─────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
* Example format version V2:
|
||||
* ┌─status─┬─────────snapshot_id─┬─sequence_number─┬─file_sequence_number─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||
* │ 1 │ 5887006101709926452 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0) │
|
||||
* └────────┴─────────────────────┴─────────────────┴──────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
* In case of partitioned data we'll have extra directory partition=value:
|
||||
* ─status─┬─────────snapshot_id─┬─data_file──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=0/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00001.parquet','PARQUET',(0),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],NULL,[4],0) │
|
||||
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=1/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00002.parquet','PARQUET',(1),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'2')],[(1,'\0\0\0\0\0\0\0'),(2,'2')],NULL,[4],0) │
|
||||
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=2/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00003.parquet','PARQUET',(2),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'3')],[(1,'\0\0\0\0\0\0\0'),(2,'3')],NULL,[4],0) │
|
||||
* └────────┴─────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
*/
|
||||
Strings getFilesForRead(const Metadata & metadata, const Configuration & configuration, ContextPtr context)
|
||||
{
|
||||
NameSet keys;
|
||||
for (const auto & manifest_file : metadata.manifest_files)
|
||||
{
|
||||
auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration);
|
||||
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
|
||||
|
||||
avro::NodePtr root_node = file_reader->dataSchema().root();
|
||||
size_t leaves_num = root_node->leaves();
|
||||
size_t expected_min_num = metadata.format_version == 1 ? 3 : 2;
|
||||
if (leaves_num < expected_min_num)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Unexpected number of columns {}. Expected at least {}",
|
||||
root_node->leaves(), expected_min_num);
|
||||
}
|
||||
|
||||
avro::NodePtr status_node = root_node->leafAt(0);
|
||||
if (status_node->type() != avro::Type::AVRO_INT)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `status` field should be Int type, got {}",
|
||||
magic_enum::enum_name(status_node->type()));
|
||||
}
|
||||
|
||||
avro::NodePtr data_file_node = root_node->leafAt(static_cast<int>(leaves_num) - 1);
|
||||
if (data_file_node->type() != avro::Type::AVRO_RECORD)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
|
||||
magic_enum::enum_name(data_file_node->type()));
|
||||
}
|
||||
|
||||
auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node);
|
||||
auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
|
||||
Block header{
|
||||
{status_col_data_type->createColumn(), status_col_data_type, "status"},
|
||||
{data_col_data_type->createColumn(), data_col_data_type, "data_file"}};
|
||||
|
||||
const auto columns = parseAvro(*file_reader, header, getFormatSettings(context));
|
||||
if (columns.size() != 2)
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"Unexpected number of columns. Expected 2, got {}", columns.size());
|
||||
}
|
||||
|
||||
if (columns.at(0)->getDataType() != TypeIndex::Int32)
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `status` field should be Int32 type, got {}",
|
||||
columns.at(0)->getFamilyName());
|
||||
}
|
||||
if (columns.at(1)->getDataType() != TypeIndex::Tuple)
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `file_path` field should be Tuple type, got {}",
|
||||
columns.at(1)->getFamilyName());
|
||||
}
|
||||
|
||||
const auto status_int_column = assert_cast<ColumnInt32 *>(columns.at(0).get());
|
||||
const auto data_file_tuple_column = assert_cast<ColumnTuple *>(columns.at(1).get());
|
||||
|
||||
if (status_int_column->size() != data_file_tuple_column->size())
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `file_path` and `status` have different rows number: {} and {}",
|
||||
status_int_column->size(), data_file_tuple_column->size());
|
||||
}
|
||||
|
||||
const auto * data_file_name_column = metadata.format_version == 1
|
||||
? data_file_tuple_column->getColumnPtr(0).get()
|
||||
: data_file_tuple_column->getColumnPtr(1).get();
|
||||
|
||||
if (data_file_name_column->getDataType() != TypeIndex::String)
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `file_path` field should be String type, got {}",
|
||||
data_file_name_column->getFamilyName());
|
||||
}
|
||||
auto file_name_str_column = assert_cast<const ColumnString *>(data_file_name_column);
|
||||
|
||||
for (size_t i = 0; i < status_int_column->size(); ++i)
|
||||
{
|
||||
const auto status = status_int_column->getInt(i);
|
||||
const auto data_path = std::string(file_name_str_column->getDataAt(i).toView());
|
||||
const auto pos = data_path.find(configuration.url.key);
|
||||
const auto file_path = data_path.substr(pos);
|
||||
if (pos == std::string::npos)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration.url.key, data_path);
|
||||
|
||||
if (status == 2)
|
||||
{
|
||||
LOG_TEST(log, "Processing delete file for path: {}", file_path);
|
||||
chassert(!keys.contains(file_path));
|
||||
}
|
||||
else
|
||||
keys.insert(file_path);
|
||||
}
|
||||
}
|
||||
|
||||
return std::vector<std::string>(keys.begin(), keys.end());
|
||||
}
|
||||
|
||||
MutableColumns parseAvro(
|
||||
avro::DataFileReaderBase & file_reader,
|
||||
const Block & header,
|
||||
const FormatSettings & settings)
|
||||
{
|
||||
auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
|
||||
MutableColumns columns = header.cloneEmptyColumns();
|
||||
|
||||
file_reader.init();
|
||||
RowReadExtension ext;
|
||||
while (file_reader.hasMore())
|
||||
{
|
||||
file_reader.decr();
|
||||
deserializer->deserializeRow(columns, file_reader.decoder(), ext);
|
||||
}
|
||||
return columns;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
|
||||
template <typename Configuration, typename MetadataReadHelper>
|
||||
IcebergMetadataParser<Configuration, MetadataReadHelper>::IcebergMetadataParser() : impl(std::make_unique<Impl>())
|
||||
{
|
||||
}
|
||||
|
||||
template <typename Configuration, typename MetadataReadHelper>
|
||||
Strings IcebergMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr context)
|
||||
{
|
||||
auto metadata = impl->processMetadataFile(configuration, context);
|
||||
|
||||
/// When table first created and does not have any data
|
||||
if (metadata.manifest_list.empty())
|
||||
return {};
|
||||
|
||||
impl->processManifestList(metadata, configuration, context);
|
||||
return impl->getFilesForRead(metadata, configuration, context);
|
||||
}
|
||||
|
||||
|
||||
template IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::IcebergMetadataParser();
|
||||
template Strings IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(const StorageS3::Configuration & configuration, ContextPtr);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -1,26 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
|
||||
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Core/Types.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
template <typename Configuration, typename MetadataReadHelper>
|
||||
struct IcebergMetadataParser
|
||||
{
|
||||
public:
|
||||
IcebergMetadataParser<Configuration, MetadataReadHelper>();
|
||||
|
||||
Strings getFiles(const Configuration & configuration, ContextPtr context);
|
||||
|
||||
private:
|
||||
struct Impl;
|
||||
std::shared_ptr<Impl> impl;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -1,25 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/DataLakes/IStorageDataLake.h>
|
||||
#include <Storages/DataLakes/IcebergMetadataParser.h>
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3 && USE_AVRO
|
||||
#include <Storages/DataLakes/S3MetadataReader.h>
|
||||
#include <Storages/StorageS3.h>
|
||||
#endif
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct StorageIcebergName
|
||||
{
|
||||
static constexpr auto name = "Iceberg";
|
||||
};
|
||||
|
||||
#if USE_AWS_S3 && USE_AVRO
|
||||
using StorageIcebergS3 = IStorageDataLake<StorageS3, StorageIcebergName, IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
|
||||
#endif
|
||||
|
||||
}
|
@ -4,7 +4,7 @@
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <Storages/DataLakes/StorageDeltaLake.h>
|
||||
#include <Storages/DataLakes/StorageIceberg.h>
|
||||
#include <Storages/DataLakes/Iceberg/StorageIceberg.h>
|
||||
#include <Storages/DataLakes/StorageHudi.h>
|
||||
|
||||
|
||||
@ -35,7 +35,7 @@ void registerStorageDeltaLake(StorageFactory & factory)
|
||||
|
||||
void registerStorageIceberg(StorageFactory & factory)
|
||||
{
|
||||
REGISTER_DATA_LAKE_STORAGE(StorageIcebergS3, StorageIcebergName::name)
|
||||
REGISTER_DATA_LAKE_STORAGE(StorageIceberg, StorageIceberg::name)
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -33,7 +33,7 @@ protected:
|
||||
if (TableFunction::configuration.structure != "auto")
|
||||
columns = parseColumnsListFromString(TableFunction::configuration.structure, context);
|
||||
|
||||
StoragePtr storage = std::make_shared<Storage>(
|
||||
StoragePtr storage = Storage::create(
|
||||
TableFunction::configuration, context, StorageID(TableFunction::getDatabaseName(), table_name),
|
||||
columns, ConstraintsDescription{}, String{}, std::nullopt);
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
#if USE_AWS_S3 && USE_AVRO
|
||||
|
||||
#include <Storages/DataLakes/StorageIceberg.h>
|
||||
#include <Storages/DataLakes/Iceberg/StorageIceberg.h>
|
||||
#include <TableFunctions/ITableFunctionDataLake.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <TableFunctions/TableFunctionS3.h>
|
||||
@ -17,7 +17,7 @@ struct TableFunctionIcebergName
|
||||
static constexpr auto name = "iceberg";
|
||||
};
|
||||
|
||||
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIcebergS3, TableFunctionS3>;
|
||||
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIceberg, TableFunctionS3>;
|
||||
|
||||
void registerTableFunctionIceberg(TableFunctionFactory & factory)
|
||||
{
|
||||
|
@ -85,3 +85,4 @@
|
||||
01940_custom_tld_sharding_key
|
||||
02815_range_dict_no_direct_join
|
||||
02861_join_on_nullsafe_compare
|
||||
01019_alter_materialized_view_consistent
|
||||
|
@ -41,6 +41,10 @@ def get_spark():
|
||||
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
|
||||
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
|
||||
.config("spark.sql.catalog.spark_catalog.warehouse", "/iceberg_data")
|
||||
.config(
|
||||
"spark.sql.extensions",
|
||||
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
|
||||
)
|
||||
.master("local")
|
||||
)
|
||||
return builder.master("local").getOrCreate()
|
||||
@ -129,12 +133,12 @@ def generate_data(spark, start, end):
|
||||
return df
|
||||
|
||||
|
||||
def create_iceberg_table(node, table_name):
|
||||
def create_iceberg_table(node, table_name, format="Parquet"):
|
||||
node.query(
|
||||
f"""
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=Iceberg(s3, filename = 'iceberg_data/default/{table_name}/')"""
|
||||
ENGINE=Iceberg(s3, filename = 'iceberg_data/default/{table_name}/', format={format})"""
|
||||
)
|
||||
|
||||
|
||||
@ -165,7 +169,7 @@ def test_single_iceberg_file(started_cluster, format_version):
|
||||
bucket = started_cluster.minio_bucket
|
||||
TABLE_NAME = "test_single_iceberg_file_" + format_version
|
||||
|
||||
inserted_data = "SELECT number, toString(number) FROM numbers(100)"
|
||||
inserted_data = "SELECT number, toString(number) as string FROM numbers(100)"
|
||||
parquet_data_path = create_initial_data_file(
|
||||
started_cluster, instance, inserted_data, TABLE_NAME
|
||||
)
|
||||
@ -308,7 +312,7 @@ def test_types(started_cluster, format_version):
|
||||
[
|
||||
["a", "Nullable(Int32)"],
|
||||
["b", "Nullable(String)"],
|
||||
["c", "Nullable(Date32)"],
|
||||
["c", "Nullable(Date)"],
|
||||
["d", "Array(Nullable(String))"],
|
||||
["e", "Nullable(Bool)"],
|
||||
]
|
||||
@ -367,3 +371,147 @@ def test_delete_files(started_cluster, format_version):
|
||||
)
|
||||
|
||||
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 50
|
||||
|
||||
|
||||
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||
def test_evolved_schema(started_cluster, format_version):
|
||||
instance = started_cluster.instances["node1"]
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
TABLE_NAME = "test_evolved_schema_" + format_version
|
||||
|
||||
write_iceberg_from_df(
|
||||
spark,
|
||||
generate_data(spark, 0, 100),
|
||||
TABLE_NAME,
|
||||
mode="overwrite",
|
||||
format_version=format_version,
|
||||
)
|
||||
|
||||
files = upload_directory(
|
||||
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
|
||||
)
|
||||
|
||||
create_iceberg_table(instance, TABLE_NAME)
|
||||
|
||||
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
||||
|
||||
spark.sql(f"ALTER TABLE {TABLE_NAME} ADD COLUMNS (x bigint)")
|
||||
files = upload_directory(
|
||||
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
|
||||
)
|
||||
|
||||
error = instance.query_and_get_error(f"SELECT * FROM {TABLE_NAME}")
|
||||
assert "UNSUPPORTED_METHOD" in error
|
||||
|
||||
|
||||
def test_row_based_deletes(started_cluster):
|
||||
instance = started_cluster.instances["node1"]
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
TABLE_NAME = "test_row_based_deletes"
|
||||
|
||||
spark.sql(
|
||||
f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')"
|
||||
)
|
||||
spark.sql(
|
||||
f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(100)"
|
||||
)
|
||||
|
||||
files = upload_directory(
|
||||
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
|
||||
)
|
||||
|
||||
create_iceberg_table(instance, TABLE_NAME)
|
||||
|
||||
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
||||
|
||||
spark.sql(f"DELETE FROM {TABLE_NAME} WHERE id < 10")
|
||||
files = upload_directory(
|
||||
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
|
||||
)
|
||||
|
||||
error = instance.query_and_get_error(f"SELECT * FROM {TABLE_NAME}")
|
||||
assert "UNSUPPORTED_METHOD" in error
|
||||
|
||||
|
||||
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||
def test_schema_inference(started_cluster, format_version):
|
||||
instance = started_cluster.instances["node1"]
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
for format in ["Parquet", "ORC", "Avro"]:
|
||||
TABLE_NAME = "test_schema_inference_" + format + "_" + format_version
|
||||
|
||||
# Types time, timestamptz, fixed are not supported in Spark.
|
||||
spark.sql(
|
||||
f"CREATE TABLE {TABLE_NAME} (intC int, longC long, floatC float, doubleC double, decimalC1 decimal(10, 3), decimalC2 decimal(20, 10), decimalC3 decimal(38, 30), dateC date, timestampC timestamp, stringC string, binaryC binary, arrayC1 array<int>, mapC1 map<string, string>, structC1 struct<field1: int, field2: string>, complexC array<struct<field1: map<string, array<map<string, int>>>, field2: struct<field3: int, field4: string>>>) USING iceberg TBLPROPERTIES ('format-version' = '{format_version}', 'write.format.default' = '{format}')"
|
||||
)
|
||||
|
||||
spark.sql(
|
||||
f"insert into {TABLE_NAME} select 42, 4242, 42.42, 4242.4242, decimal(42.42), decimal(42.42), decimal(42.42), date('2020-01-01'), timestamp('2020-01-01 20:00:00'), 'hello', binary('hello'), array(1,2,3), map('key', 'value'), struct(42, 'hello'), array(struct(map('key', array(map('key', 42))), struct(42, 'hello')))"
|
||||
)
|
||||
|
||||
files = upload_directory(
|
||||
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
|
||||
)
|
||||
|
||||
create_iceberg_table(instance, TABLE_NAME, format)
|
||||
|
||||
res = instance.query(f"DESC {TABLE_NAME} FORMAT TSVRaw")
|
||||
expected = TSV(
|
||||
[
|
||||
["intC", "Nullable(Int32)"],
|
||||
["longC", "Nullable(Int64)"],
|
||||
["floatC", "Nullable(Float32)"],
|
||||
["doubleC", "Nullable(Float64)"],
|
||||
["decimalC1", "Nullable(Decimal(10, 3))"],
|
||||
["decimalC2", "Nullable(Decimal(20, 10))"],
|
||||
["decimalC3", "Nullable(Decimal(38, 30))"],
|
||||
["dateC", "Nullable(Date)"],
|
||||
["timestampC", "Nullable(DateTime64(6, 'UTC'))"],
|
||||
["stringC", "Nullable(String)"],
|
||||
["binaryC", "Nullable(String)"],
|
||||
["arrayC1", "Array(Nullable(Int32))"],
|
||||
["mapC1", "Map(String, Nullable(String))"],
|
||||
["structC1", "Tuple(field1 Nullable(Int32), field2 Nullable(String))"],
|
||||
[
|
||||
"complexC",
|
||||
"Array(Tuple(field1 Map(String, Array(Map(String, Nullable(Int32)))), field2 Tuple(field3 Nullable(Int32), field4 Nullable(String))))",
|
||||
],
|
||||
]
|
||||
)
|
||||
|
||||
assert res == expected
|
||||
|
||||
# Check that we can parse data
|
||||
instance.query(f"SELECT * FROM {TABLE_NAME}")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||
def test_metadata_file_selection(started_cluster, format_version):
|
||||
instance = started_cluster.instances["node1"]
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
TABLE_NAME = "test_metadata_selection_" + format_version
|
||||
|
||||
spark.sql(
|
||||
f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')"
|
||||
)
|
||||
|
||||
for i in range(50):
|
||||
spark.sql(
|
||||
f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)"
|
||||
)
|
||||
|
||||
files = upload_directory(
|
||||
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
|
||||
)
|
||||
|
||||
create_iceberg_table(instance, TABLE_NAME)
|
||||
|
||||
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500
|
||||
|
18
tests/performance/enum_in_set.xml
Normal file
18
tests/performance/enum_in_set.xml
Normal file
@ -0,0 +1,18 @@
|
||||
<test>
|
||||
<!-- high cardinality -->
|
||||
<create_query>
|
||||
CREATE TABLE iso_3166_1_alpha_2
|
||||
(
|
||||
`c` Enum8('LI' = -128, 'LT' = -127, 'LU' = -126, 'MO' = -125, 'MK' = -124, 'MG' = -123, 'MW' = -122, 'MY' = -121, 'MV' = -120, 'ML' = -119, 'MT' = -118, 'MH' = -117, 'MQ' = -116, 'MR' = -115, 'MU' = -114, 'YT' = -113, 'MX' = -112, 'FM' = -111, 'MD' = -110, 'MC' = -109, 'MN' = -108, 'ME' = -107, 'MS' = -106, 'MA' = -105, 'MZ' = -104, 'MM' = -103, 'NA' = -102, 'NR' = -101, 'NP' = -100, 'NL' = -99, 'NC' = -98, 'NZ' = -97, 'NI' = -96, 'NE' = -95, 'NG' = -94, 'NU' = -93, 'NF' = -92, 'MP' = -91, 'NO' = -90, 'OM' = -89, 'PK' = -88, 'PW' = -87, 'PS' = -86, 'PA' = -85, 'PG' = -84, 'PY' = -83, 'PE' = -82, 'PH' = -81, 'PN' = -80, 'PL' = -79, 'PT' = -78, 'PR' = -77, 'QA' = -76, 'RE' = -75, 'RO' = -74, 'RU' = -73, 'RW' = -72, 'BL' = -71, 'SH' = -70, 'KN' = -69, 'LC' = -68, 'MF' = -67, 'PM' = -66, 'VC' = -65, 'WS' = -64, 'SM' = -63, 'ST' = -62, 'SA' = -61, 'SN' = -60, 'RS' = -59, 'SC' = -58, 'SL' = -57, 'SG' = -56, 'SX' = -55, 'SK' = -54, 'SI' = -53, 'SB' = -52, 'SO' = -51, 'ZA' = -50, 'GS' = -49, 'SS' = -48, 'ES' = -47, 'LK' = -46, 'SD' = -45, 'SR' = -44, 'SJ' = -43, 'SZ' = -42, 'SE' = -41, 'CH' = -40, 'SY' = -39, 'TW' = -38, 'TJ' = -37, 'TZ' = -36, 'TH' = -35, 'TL' = -34, 'TG' = -33, 'TK' = -32, 'TO' = -31, 'TT' = -30, 'TN' = -29, 'TR' = -28, 'TM' = -27, 'TC' = -26, 'TV' = -25, 'UG' = -24, 'UA' = -23, 'AE' = -22, 'GB' = -21, 'UM' = -20, 'US' = -19, 'UY' = -18, 'UZ' = -17, 'VU' = -16, 'VE' = -15, 'VN' = -14, 'VG' = -13, 'VI' = -12, 'WF' = -11, 'EH' = -10, 'YE' = -9, 'ZM' = -8, 'ZW' = -7, 'OTHER' = 0, 'AF' = 1, 'AX' = 2, 'AL' = 3, 'DZ' = 4, 'AS' = 5, 'AD' = 6, 'AO' = 7, 'AI' = 8, 'AQ' = 9, 'AG' = 10, 'AR' = 11, 'AM' = 12, 'AW' = 13, 'AU' = 14, 'AT' = 15, 'AZ' = 16, 'BS' = 17, 'BH' = 18, 'BD' = 19, 'BB' = 20, 'BY' = 21, 'BE' = 22, 'BZ' = 23, 'BJ' = 24, 'BM' = 25, 'BT' = 26, 'BO' = 27, 'BQ' = 28, 'BA' = 29, 'BW' = 30, 'BV' = 31, 'BR' = 32, 'IO' = 33, 'BN' = 34, 'BG' = 35, 'BF' = 36, 'BI' = 37, 'CV' = 38, 'KH' = 39, 'CM' = 40, 'CA' = 41, 'KY' = 42, 'CF' = 43, 'TD' = 44, 'CL' = 45, 'CN' = 46, 'CX' = 47, 'CC' = 48, 'CO' = 49, 'KM' = 50, 'CD' = 51, 'CG' = 52, 'CK' = 53, 'CR' = 54, 'CI' = 55, 'HR' = 56, 'CU' = 57, 'CW' = 58, 'CY' = 59, 'CZ' = 60, 'DK' = 61, 'DJ' = 62, 'DM' = 63, 'DO' = 64, 'EC' = 65, 'EG' = 66, 'SV' = 67, 'GQ' = 68, 'ER' = 69, 'EE' = 70, 'ET' = 71, 'FK' = 72, 'FO' = 73, 'FJ' = 74, 'FI' = 75, 'FR' = 76, 'GF' = 77, 'PF' = 78, 'TF' = 79, 'GA' = 80, 'GM' = 81, 'GE' = 82, 'DE' = 83, 'GH' = 84, 'GI' = 85, 'GR' = 86, 'GL' = 87, 'GD' = 88, 'GP' = 89, 'GU' = 90, 'GT' = 91, 'GG' = 92, 'GN' = 93, 'GW' = 94, 'GY' = 95, 'HT' = 96, 'HM' = 97, 'VA' = 98, 'HN' = 99, 'HK' = 100, 'HU' = 101, 'IS' = 102, 'IN' = 103, 'ID' = 104, 'IR' = 105, 'IQ' = 106, 'IE' = 107, 'IM' = 108, 'IL' = 109, 'IT' = 110, 'JM' = 111, 'JP' = 112, 'JE' = 113, 'JO' = 114, 'KZ' = 115, 'KE' = 116, 'KI' = 117, 'KP' = 118, 'KR' = 119, 'KW' = 120, 'KG' = 121, 'LA' = 122, 'LV' = 123, 'LB' = 124, 'LS' = 125, 'LR' = 126, 'LY' = 127)
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY tuple()
|
||||
SETTINGS index_granularity = 8192
|
||||
</create_query>
|
||||
<fill_query>INSERT INTO iso_3166_1_alpha_2 SELECT (rand(number) % 256) - 128 FROM numbers(200000000)</fill_query>
|
||||
<fill_query>OPTIMIZE TABLE iso_3166_1_alpha_2 FINAL</fill_query>
|
||||
|
||||
<query>SELECT count() FROM iso_3166_1_alpha_2 WHERE c NOT IN ('CU', 'BN', 'VI', 'US', 'AQ', 'AG', 'AR', 'AM', 'AW', 'AU', 'AT', 'AZ', 'BS', 'BH', 'BD', 'BB', 'BY', 'BE') FORMAT Null SETTINGS max_threads = 1</query>
|
||||
|
||||
<drop_query>DROP TABLE IF EXISTS iso_3166_1_alpha_2</drop_query>
|
||||
</test>
|
@ -0,0 +1,4 @@
|
||||
Code: 62. DB::Ex---tion: When creating a materialized view you can't declare both 'TO [db].[table]' and 'EMPTY'. (SYNTAX_ERROR) (version reference)
|
||||
Code: 62. DB::Ex---tion: When creating a materialized view you can't declare both 'TO [db].[table]' and 'POPULATE'. (SYNTAX_ERROR) (version reference)
|
||||
Code: 62. DB::Ex---tion: When creating a materialized view you can't declare both 'TO [db].[table]' and 'ENGINE'. (SYNTAX_ERROR) (version reference)
|
||||
Code: 62. DB::Ex---tion: When creating a materialized view you can't declare both 'ENGINE' and 'TO [db].[table]'. (SYNTAX_ERROR) (version reference)
|
11
tests/queries/0_stateless/02900_matview_create_to_errors.sh
Executable file
11
tests/queries/0_stateless/02900_matview_create_to_errors.sh
Executable file
@ -0,0 +1,11 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}" -d 'create materialized view aaaa TO b EMPTY as Select * from a;' | sed -e 's/Exception/Ex---tion/ ; s/version .*/version reference)/g'
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}" -d 'create materialized view aaaa TO b POPULATE as Select * from a;' | sed -e 's/Exception/Ex---tion/ ; s/version .*/version reference)/g'
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}" -d 'create materialized view aaaa TO b ENGINE = MergeTree() as Select * from a;' | sed -e 's/Exception/Ex---tion/ ; s/version .*/version reference)/g'
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}" -d 'create materialized view aaaa ENGINE = MergeTree() TO b as Select * from a;' | sed -e 's/Exception/Ex---tion/ ; s/version .*/version reference)/g'
|
@ -0,0 +1,5 @@
|
||||
false
|
||||
false
|
||||
false
|
||||
|
||||
|
@ -0,0 +1,45 @@
|
||||
-- https://github.com/ClickHouse/ClickHouse/issues/55843
|
||||
-- These tests pass without the fix when either of
|
||||
-- - optimize_read_in_window_order = 0 and optimize_read_in_order = 0
|
||||
-- - ratio_of_defaults_for_sparse_serialization = 1
|
||||
-- However it is better to leave the settings as randomized because we run
|
||||
-- stateless tests quite a few times during a PR, so if a bug is introduced
|
||||
-- then there is a big chance of catching it. Furthermore, randomized settings
|
||||
-- might identify new bugs.
|
||||
|
||||
CREATE TABLE test1
|
||||
(
|
||||
id String,
|
||||
time DateTime64(9),
|
||||
key Int64,
|
||||
value Bool,
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
PARTITION BY toYYYYMM(time)
|
||||
ORDER BY (key, id, time);
|
||||
|
||||
INSERT INTO test1 VALUES ('id0', now(), 3, false)
|
||||
|
||||
SELECT last_value(value) OVER (PARTITION BY id ORDER BY time ASC) as last_value
|
||||
FROM test1
|
||||
WHERE (key = 3);
|
||||
|
||||
SELECT last_value(value) OVER (ORDER BY time ASC) as last_value
|
||||
FROM test1
|
||||
WHERE (key = 3);
|
||||
|
||||
SELECT last_value(value) OVER (PARTITION BY id ORDER BY time ASC) as last_value
|
||||
FROM test1;
|
||||
|
||||
|
||||
|
||||
CREATE TABLE test2
|
||||
(
|
||||
time DateTime,
|
||||
value String
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (time) AS SELECT 0, '';
|
||||
|
||||
SELECT any(value) OVER (ORDER BY time ASC) FROM test2;
|
||||
SELECT last_value(value) OVER (ORDER BY time ASC) FROM test2;
|
@ -0,0 +1,15 @@
|
||||
1
|
||||
02901_parallel_replicas_rollup-default Used parallel replicas: true
|
||||
0 0 0 6
|
||||
2019 0 0 2
|
||||
2019 1 0 2
|
||||
2019 1 5 1
|
||||
2019 1 15 1
|
||||
2020 0 0 4
|
||||
2020 1 0 2
|
||||
2020 1 5 1
|
||||
2020 1 15 1
|
||||
2020 10 0 2
|
||||
2020 10 5 1
|
||||
2020 10 15 1
|
||||
02901_parallel_replicas_rollup2-default Used parallel replicas: true
|
74
tests/queries/0_stateless/02901_parallel_replicas_rollup.sh
Executable file
74
tests/queries/0_stateless/02901_parallel_replicas_rollup.sh
Executable file
@ -0,0 +1,74 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
function were_parallel_replicas_used ()
|
||||
{
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS"
|
||||
|
||||
# Not using current_database = '$CLICKHOUSE_DATABASE' as nested parallel queries aren't run with it
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
SELECT
|
||||
initial_query_id,
|
||||
concat('Used parallel replicas: ', (countIf(initial_query_id != query_id) != 0)::bool::String) as used
|
||||
FROM system.query_log
|
||||
WHERE event_date >= yesterday()
|
||||
AND initial_query_id = '$1'
|
||||
GROUP BY initial_query_id
|
||||
ORDER BY min(event_time_microseconds) ASC
|
||||
FORMAT TSV"
|
||||
}
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS nested"
|
||||
$CLICKHOUSE_CLIENT --query "CREATE TABLE nested (x UInt8) ENGINE = MergeTree ORDER BY () AS Select 1";
|
||||
|
||||
query_id="02901_parallel_replicas_rollup-$CLICKHOUSE_DATABASE"
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--query_id "${query_id}" \
|
||||
--max_parallel_replicas 3 \
|
||||
--prefer_localhost_replica 1 \
|
||||
--use_hedged_requests 0 \
|
||||
--cluster_for_parallel_replicas "parallel_replicas" \
|
||||
--allow_experimental_parallel_reading_from_replicas 1 \
|
||||
--parallel_replicas_for_non_replicated_merge_tree 1 \
|
||||
--parallel_replicas_min_number_of_rows_per_replica 0 \
|
||||
--query "
|
||||
SELECT 1 FROM nested
|
||||
GROUP BY 1 WITH ROLLUP
|
||||
ORDER BY max((SELECT 1 WHERE 0));
|
||||
";
|
||||
were_parallel_replicas_used $query_id
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS nested"
|
||||
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS days"
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
CREATE TABLE days
|
||||
(
|
||||
year Int64,
|
||||
month Int64,
|
||||
day Int64
|
||||
)
|
||||
ENGINE = MergeTree()
|
||||
ORDER BY year";
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
INSERT INTO days VALUES (2019, 1, 5), (2019, 1, 15), (2020, 1, 5), (2020, 1, 15), (2020, 10, 5), (2020, 10, 15);
|
||||
";
|
||||
|
||||
# Note that we enforce ordering of the final output because it's not guaranteed by GROUP BY ROLLUP, only the values of count() are
|
||||
query_id="02901_parallel_replicas_rollup2-$CLICKHOUSE_DATABASE"
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--query_id "${query_id}" \
|
||||
--max_parallel_replicas 3 \
|
||||
--prefer_localhost_replica 1 \
|
||||
--use_hedged_requests 0 \
|
||||
--cluster_for_parallel_replicas "parallel_replicas" \
|
||||
--allow_experimental_parallel_reading_from_replicas 1 \
|
||||
--parallel_replicas_for_non_replicated_merge_tree 1 \
|
||||
--parallel_replicas_min_number_of_rows_per_replica 0 \
|
||||
--query "SELECT * FROM (SELECT year, month, day, count(*) FROM days GROUP BY year, month, day WITH ROLLUP) ORDER BY 1, 2, 3";
|
||||
|
||||
were_parallel_replicas_used $query_id
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS days"
|
@ -1455,6 +1455,7 @@ farmFingerprint
|
||||
farmHash
|
||||
fastops
|
||||
fcoverage
|
||||
fibonacci
|
||||
fifo
|
||||
filesystem
|
||||
filesystemAvailable
|
||||
|
Loading…
Reference in New Issue
Block a user