Merge remote-tracking branch 'origin/master' into pr-right-joins

This commit is contained in:
Igor Nikonov 2024-11-05 19:11:07 +00:00
commit a237cc82fb
35 changed files with 737 additions and 736 deletions

2
contrib/SimSIMD vendored

@ -1 +1 @@
Subproject commit 935fef2964bc38e995c5f465b42259a35b8cf0d3
Subproject commit ee3c9c9c00b51645f62a1a9e99611b78c0052a21

View File

@ -1,4 +1,8 @@
set(SIMSIMD_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/SimSIMD")
add_library(_simsimd INTERFACE)
target_include_directories(_simsimd SYSTEM INTERFACE "${SIMSIMD_PROJECT_DIR}/include")
# See contrib/usearch-cmake/CMakeLists.txt, why only enabled on x86
if (ARCH_AMD64)
set(SIMSIMD_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/SimSIMD")
set(SIMSIMD_SRCS ${SIMSIMD_PROJECT_DIR}/c/lib.c)
add_library(_simsimd ${SIMSIMD_SRCS})
target_include_directories(_simsimd SYSTEM PUBLIC "${SIMSIMD_PROJECT_DIR}/include")
target_compile_definitions(_simsimd PUBLIC SIMSIMD_DYNAMIC_DISPATCH)
endif()

2
contrib/usearch vendored

@ -1 +1 @@
Subproject commit 53799b84ca9ad708b060d0b1cfa5f039371721cd
Subproject commit 7efe8b710c9831bfe06573b1df0fad001b04a2b5

View File

@ -6,12 +6,63 @@ target_include_directories(_usearch SYSTEM INTERFACE ${USEARCH_PROJECT_DIR}/incl
target_link_libraries(_usearch INTERFACE _fp16)
target_compile_definitions(_usearch INTERFACE USEARCH_USE_FP16LIB)
# target_compile_definitions(_usearch INTERFACE USEARCH_USE_SIMSIMD)
# ^^ simsimd is not enabled at the moment. Reasons:
# - Vectorization is important for raw scans but not so much for HNSW. We use usearch only for HNSW.
# - Simsimd does compile-time dispatch (choice of SIMD kernels determined by capabilities of the build machine) or dynamic dispatch (SIMD
# kernels chosen at runtime based on cpuid instruction). Since current builds are limited to SSE 4.2 (x86) and NEON (ARM), the speedup of
# the former would be moderate compared to AVX-512 / SVE. The latter is at the moment too fragile with respect to portability across x86
# and ARM machines ... certain conbinations of quantizations / distance functions / SIMD instructions are not implemented at the moment.
# Only x86 for now. On ARM, the linker goes down in flames. To make SimSIMD compile, I had to remove a macro checks in SimSIMD
# for AVX512 (x86, worked nicely) and __ARM_BF16_FORMAT_ALTERNATIVE. It is probably because of that.
if (ARCH_AMD64)
target_link_libraries(_usearch INTERFACE _simsimd)
target_compile_definitions(_usearch INTERFACE USEARCH_USE_SIMSIMD)
target_compile_definitions(_usearch INTERFACE USEARCH_CAN_COMPILE_FLOAT16)
target_compile_definitions(_usearch INTERFACE USEARCH_CAN_COMPILE_BF16)
endif ()
add_library(ch_contrib::usearch ALIAS _usearch)
# Cf. https://github.com/llvm/llvm-project/issues/107810 (though it is not 100% the same stack)
#
# LLVM ERROR: Cannot select: 0x7996e7a73150: f32,ch = load<(load (s16) from %ir.22, !tbaa !54231), anyext from bf16> 0x79961cb737c0, 0x7996e7a1a500, undef:i64, ./contrib/SimSIMD/include/simsimd/dot.h:215:1
# 0x7996e7a1a500: i64 = add 0x79961e770d00, Constant:i64<-16>, ./contrib/SimSIMD/include/simsimd/dot.h:215:1
# 0x79961e770d00: i64,ch = CopyFromReg 0x79961cb737c0, Register:i64 %4, ./contrib/SimSIMD/include/simsimd/dot.h:215:1
# 0x7996e7a1ae10: i64 = Register %4
# 0x7996e7a1b5f0: i64 = Constant<-16>
# 0x7996e7a1a730: i64 = undef
# In function: _ZL23simsimd_dot_bf16_serialPKu6__bf16S0_yPd
# PLEASE submit a bug report to https://github.com/llvm/llvm-project/issues/ and include the crash backtrace.
# Stack dump:
# 0. Running pass 'Function Pass Manager' on module 'src/libdbms.a(MergeTreeIndexVectorSimilarity.cpp.o at 2312737440)'.
# 1. Running pass 'AArch64 Instruction Selection' on function '@_ZL23simsimd_dot_bf16_serialPKu6__bf16S0_yPd'
# #0 0x00007999e83a63bf llvm::sys::PrintStackTrace(llvm::raw_ostream&, int) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0xda63bf)
# #1 0x00007999e83a44f9 llvm::sys::RunSignalHandlers() (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0xda44f9)
# #2 0x00007999e83a6b00 (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0xda6b00)
# #3 0x00007999e6e45320 (/lib/x86_64-linux-gnu/libc.so.6+0x45320)
# #4 0x00007999e6e9eb1c pthread_kill (/lib/x86_64-linux-gnu/libc.so.6+0x9eb1c)
# #5 0x00007999e6e4526e raise (/lib/x86_64-linux-gnu/libc.so.6+0x4526e)
# #6 0x00007999e6e288ff abort (/lib/x86_64-linux-gnu/libc.so.6+0x288ff)
# #7 0x00007999e82fe0c2 llvm::report_fatal_error(llvm::Twine const&, bool) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0xcfe0c2)
# #8 0x00007999e8c2f8e3 (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x162f8e3)
# #9 0x00007999e8c2ed76 llvm::SelectionDAGISel::SelectCodeCommon(llvm::SDNode*, unsigned char const*, unsigned int) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x162ed76)
# #10 0x00007999ea1adbcb (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x2badbcb)
# #11 0x00007999e8c2611f llvm::SelectionDAGISel::DoInstructionSelection() (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x162611f)
# #12 0x00007999e8c25790 llvm::SelectionDAGISel::CodeGenAndEmitDAG() (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x1625790)
# #13 0x00007999e8c248de llvm::SelectionDAGISel::SelectAllBasicBlocks(llvm::Function const&) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x16248de)
# #14 0x00007999e8c22934 llvm::SelectionDAGISel::runOnMachineFunction(llvm::MachineFunction&) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x1622934)
# #15 0x00007999e87826b9 llvm::MachineFunctionPass::runOnFunction(llvm::Function&) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x11826b9)
# #16 0x00007999e84f7772 llvm::FPPassManager::runOnFunction(llvm::Function&) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0xef7772)
# #17 0x00007999e84fd2f4 llvm::FPPassManager::runOnModule(llvm::Module&) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0xefd2f4)
# #18 0x00007999e84f7e9f llvm::legacy::PassManagerImpl::run(llvm::Module&) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0xef7e9f)
# #19 0x00007999e99f7d61 (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x23f7d61)
# #20 0x00007999e99f8c91 (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x23f8c91)
# #21 0x00007999e99f8b10 llvm::lto::thinBackend(llvm::lto::Config const&, unsigned int, std::function<llvm::Expected<std::unique_ptr<llvm::CachedFileStream, std::default_delete<llvm::CachedFileStream>>> (unsigned int, llvm::Twine const&)>, llvm::Module&, llvm::ModuleSummaryIndex const&, llvm::DenseMap<llvm::StringRef, std::unordered_set<unsigned long, std::hash<unsigned long>, std::equal_to<unsigned long>, std::allocator<unsigned long>>, llvm::DenseMapInfo<llvm::StringRef, void
# >, llvm::detail::DenseMapPair<llvm::StringRef, std::unordered_set<unsigned long, std::hash<unsigned long>, std::equal_to<unsigned long>, std::allocator<unsigned long>>>> const&, llvm::DenseMap<unsigned long, llvm::GlobalValueSummary*, llvm::DenseMapInfo<unsigned long, void>, llvm::detail::DenseMapPair<unsigned long, llvm::GlobalValueSummary*>> const&, llvm::MapVector<llvm::StringRef, llvm::BitcodeModule, llvm::DenseMap<llvm::StringRef, unsigned int, llvm::DenseMapInfo<llvm::S
# tringRef, void>, llvm::detail::DenseMapPair<llvm::StringRef, unsigned int>>, llvm::SmallVector<std::pair<llvm::StringRef, llvm::BitcodeModule>, 0u>>*, std::vector<unsigned char, std::allocator<unsigned char>> const&) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x23f8b10)
# #22 0x00007999e99f248d (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x23f248d)
# #23 0x00007999e99f1cd6 (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0x23f1cd6)
# #24 0x00007999e82c9beb (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0xcc9beb)
# #25 0x00007999e834ebe3 llvm::ThreadPool::processTasks(llvm::ThreadPoolTaskGroup*) (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0xd4ebe3)
# #26 0x00007999e834f704 (/usr/lib/llvm-18/bin/../lib/libLLVM.so.18.1+0xd4f704)
# #27 0x00007999e6e9ca94 (/lib/x86_64-linux-gnu/libc.so.6+0x9ca94)
# #28 0x00007999e6f29c3c (/lib/x86_64-linux-gnu/libc.so.6+0x129c3c)
# clang++-18: error: unable to execute command: Aborted (core dumped)
# clang++-18: error: linker command failed due to signal (use -v to see invocation)
# ^[[A^Cninja: build stopped: interrupted by user.

View File

@ -5623,7 +5623,7 @@ If true, and JOIN can be executed with parallel replicas algorithm, and all stor
DECLARE(UInt64, parallel_replicas_mark_segment_size, 0, R"(
Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing. Value should be in range [128; 16384]
)", BETA) \
DECLARE(Bool, parallel_replicas_local_plan, false, R"(
DECLARE(Bool, parallel_replicas_local_plan, true, R"(
Build local plan for local replica
)", BETA) \
\

View File

@ -74,6 +74,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"backup_restore_keeper_max_retries_while_handling_error", 0, 20, "New setting."},
{"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."},
{"query_plan_join_inner_table_selection", "auto", "auto", "New setting."},
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
}
},
{"24.10",

View File

@ -724,7 +724,10 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
{
if (secret_arguments.are_named)
{
assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
if (const auto * func_ast = typeid_cast<const ASTFunction *>(argument.get()))
func_ast->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
else
argument->formatImpl(settings, state, nested_dont_need_parens);
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
}
if (!secret_arguments.replacement.empty())

View File

@ -0,0 +1,112 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/ObjectStorage/Azure/Configuration.h>
#include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/Local/Configuration.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/StorageFactory.h>
#include <Common/logger_useful.h>
#include <memory>
namespace DB
{
template <typename T>
concept StorageConfiguration = std::derived_from<T, StorageObjectStorage::Configuration>;
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
class DataLakeConfiguration : public BaseStorageConfiguration, public std::enable_shared_from_this<StorageObjectStorage::Configuration>
{
public:
using Configuration = StorageObjectStorage::Configuration;
bool isDataLakeConfiguration() const override { return true; }
std::string getEngineName() const override { return DataLakeMetadata::name; }
void update(ObjectStoragePtr object_storage, ContextPtr local_context) override
{
BaseStorageConfiguration::update(object_storage, local_context);
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context);
if (current_metadata && *current_metadata == *new_metadata)
return;
current_metadata = std::move(new_metadata);
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns());
}
std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const override
{
if (!current_metadata)
return std::nullopt;
auto schema_from_metadata = current_metadata->getTableSchema();
if (!schema_from_metadata.empty())
{
return ColumnsDescription(std::move(schema_from_metadata));
}
return std::nullopt;
}
private:
DataLakeMetadataPtr current_metadata;
ReadFromFormatInfo prepareReadingFromFormat(
ObjectStoragePtr object_storage,
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr local_context) override
{
auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns);
if (!current_metadata)
{
current_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context);
}
auto column_mapping = current_metadata->getColumnNameToPhysicalNameMapping();
if (!column_mapping.empty())
{
for (const auto & [column_name, physical_name] : column_mapping)
{
auto & column = info.format_header.getByName(column_name);
column.name = physical_name;
}
}
return info;
}
};
#if USE_AVRO
#if USE_AWS_S3
using StorageS3IcebergConfiguration = DataLakeConfiguration<StorageS3Configuration, IcebergMetadata>;
# endif
#if USE_AZURE_BLOB_STORAGE
using StorageAzureIcebergConfiguration = DataLakeConfiguration<StorageAzureConfiguration, IcebergMetadata>;
# endif
#if USE_HDFS
using StorageHDFSIcebergConfiguration = DataLakeConfiguration<StorageHDFSConfiguration, IcebergMetadata>;
# endif
using StorageLocalIcebergConfiguration = DataLakeConfiguration<StorageLocalConfiguration, IcebergMetadata>;
#endif
#if USE_PARQUET
#if USE_AWS_S3
using StorageS3DeltaLakeConfiguration = DataLakeConfiguration<StorageS3Configuration, DeltaLakeMetadata>;
# endif
#endif
#if USE_AWS_S3
using StorageS3HudiConfiguration = DataLakeConfiguration<StorageS3Configuration, HudiMetadata>;
#endif
}

View File

@ -56,22 +56,18 @@ namespace ErrorCodes
struct DeltaLakeMetadataImpl
{
using ConfigurationPtr = DeltaLakeMetadata::ConfigurationPtr;
using ConfigurationObserverPtr = DeltaLakeMetadata::ConfigurationObserverPtr;
ObjectStoragePtr object_storage;
ConfigurationPtr configuration;
ConfigurationObserverPtr configuration;
ContextPtr context;
/**
* Useful links:
* - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files
*/
DeltaLakeMetadataImpl(ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ContextPtr context_)
: object_storage(object_storage_)
, configuration(configuration_)
, context(context_)
DeltaLakeMetadataImpl(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_)
: object_storage(object_storage_), configuration(configuration_), context(context_)
{
}
@ -111,6 +107,7 @@ struct DeltaLakeMetadataImpl
};
DeltaLakeMetadata processMetadataFiles()
{
auto configuration_ptr = configuration.lock();
std::set<String> result_files;
NamesAndTypesList current_schema;
DataLakePartitionColumns current_partition_columns;
@ -122,7 +119,7 @@ struct DeltaLakeMetadataImpl
while (true)
{
const auto filename = withPadding(++current_version) + metadata_file_suffix;
const auto file_path = std::filesystem::path(configuration->getPath()) / deltalake_metadata_directory / filename;
const auto file_path = std::filesystem::path(configuration_ptr->getPath()) / deltalake_metadata_directory / filename;
if (!object_storage->exists(StoredObject(file_path)))
break;
@ -136,7 +133,7 @@ struct DeltaLakeMetadataImpl
}
else
{
const auto keys = listFiles(*object_storage, *configuration, deltalake_metadata_directory, metadata_file_suffix);
const auto keys = listFiles(*object_storage, *configuration_ptr, deltalake_metadata_directory, metadata_file_suffix);
for (const String & key : keys)
processMetadataFile(key, current_schema, current_partition_columns, result_files);
}
@ -246,6 +243,8 @@ struct DeltaLakeMetadataImpl
}
}
auto configuration_ptr = configuration.lock();
if (object->has("add"))
{
auto add_object = object->get("add").extract<Poco::JSON::Object::Ptr>();
@ -253,7 +252,7 @@ struct DeltaLakeMetadataImpl
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to extract `add` field");
auto path = add_object->getValue<String>("path");
result.insert(fs::path(configuration->getPath()) / path);
result.insert(fs::path(configuration_ptr->getPath()) / path);
auto filename = fs::path(path).filename().string();
auto it = file_partition_columns.find(filename);
@ -297,7 +296,7 @@ struct DeltaLakeMetadataImpl
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to extract `remove` field");
auto path = remove_object->getValue<String>("path");
result.erase(fs::path(configuration->getPath()) / path);
result.erase(fs::path(configuration_ptr->getPath()) / path);
}
}
}
@ -488,7 +487,9 @@ struct DeltaLakeMetadataImpl
*/
size_t readLastCheckpointIfExists() const
{
const auto last_checkpoint_file = std::filesystem::path(configuration->getPath()) / deltalake_metadata_directory / "_last_checkpoint";
auto configuration_ptr = configuration.lock();
const auto last_checkpoint_file
= std::filesystem::path(configuration_ptr->getPath()) / deltalake_metadata_directory / "_last_checkpoint";
if (!object_storage->exists(StoredObject(last_checkpoint_file)))
return 0;
@ -555,7 +556,11 @@ struct DeltaLakeMetadataImpl
return 0;
const auto checkpoint_filename = withPadding(version) + ".checkpoint.parquet";
const auto checkpoint_path = std::filesystem::path(configuration->getPath()) / deltalake_metadata_directory / checkpoint_filename;
auto configuration_ptr = configuration.lock();
const auto checkpoint_path
= std::filesystem::path(configuration_ptr->getPath()) / deltalake_metadata_directory / checkpoint_filename;
LOG_TRACE(log, "Using checkpoint file: {}", checkpoint_path.string());
@ -671,7 +676,7 @@ struct DeltaLakeMetadataImpl
}
LOG_TEST(log, "Adding {}", path);
const auto [_, inserted] = result.insert(std::filesystem::path(configuration->getPath()) / path);
const auto [_, inserted] = result.insert(std::filesystem::path(configuration_ptr->getPath()) / path);
if (!inserted)
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path);
}
@ -682,10 +687,7 @@ struct DeltaLakeMetadataImpl
LoggerPtr log = getLogger("DeltaLakeMetadataParser");
};
DeltaLakeMetadata::DeltaLakeMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ContextPtr context_)
DeltaLakeMetadata::DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_)
{
auto impl = DeltaLakeMetadataImpl(object_storage_, configuration_, context_);
auto result = impl.processMetadataFiles();

View File

@ -1,5 +1,9 @@
#pragma once
#include "config.h"
#if USE_PARQUET
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
@ -12,13 +16,10 @@ namespace DB
class DeltaLakeMetadata final : public IDataLakeMetadata
{
public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
static constexpr auto name = "DeltaLake";
DeltaLakeMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ContextPtr context_);
DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
Strings getDataFiles() const override { return data_files; }
@ -36,10 +37,7 @@ public:
&& data_files == deltalake_metadata->data_files;
}
static DataLakeMetadataPtr create(
ObjectStoragePtr object_storage,
ConfigurationPtr configuration,
ContextPtr local_context)
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context)
{
return std::make_unique<DeltaLakeMetadata>(object_storage, configuration, local_context);
}
@ -52,3 +50,5 @@ private:
};
}
#endif

View File

@ -1,11 +1,10 @@
#include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
#include <Storages/ObjectStorage/DataLakes/Common.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <Storages/ObjectStorage/DataLakes/Common.h>
#include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
#include <base/find_symbols.h>
#include <Poco/String.h>
#include "config.h"
#include <IO/ReadHelpers.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -43,8 +42,9 @@ namespace ErrorCodes
*/
Strings HudiMetadata::getDataFilesImpl() const
{
auto configuration_ptr = configuration.lock();
auto log = getLogger("HudiMetadata");
const auto keys = listFiles(*object_storage, *configuration, "", Poco::toLower(configuration->format));
const auto keys = listFiles(*object_storage, *configuration_ptr, "", Poco::toLower(configuration_ptr->format));
using Partition = std::string;
using FileID = std::string;
@ -86,13 +86,8 @@ Strings HudiMetadata::getDataFilesImpl() const
return result;
}
HudiMetadata::HudiMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ContextPtr context_)
: WithContext(context_)
, object_storage(object_storage_)
, configuration(configuration_)
HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_)
: WithContext(context_), object_storage(object_storage_), configuration(configuration_)
{
}

View File

@ -13,14 +13,11 @@ namespace DB
class HudiMetadata final : public IDataLakeMetadata, private WithContext
{
public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
static constexpr auto name = "Hudi";
HudiMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ContextPtr context_);
HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
Strings getDataFiles() const override;
@ -38,17 +35,14 @@ public:
&& data_files == hudi_metadata->data_files;
}
static DataLakeMetadataPtr create(
ObjectStoragePtr object_storage,
ConfigurationPtr configuration,
ContextPtr local_context)
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context)
{
return std::make_unique<HudiMetadata>(object_storage, configuration, local_context);
}
private:
const ObjectStoragePtr object_storage;
const ConfigurationPtr configuration;
const ConfigurationObserverPtr configuration;
mutable Strings data_files;
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;

View File

@ -1,169 +0,0 @@
#pragma once
#include "config.h"
#if USE_AVRO
#include <Storages/IStorage.h>
#include <Storages/StorageFactory.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
#include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h>
#include <Common/logger_useful.h>
namespace DB
{
/// Storage for read-only integration with Apache Iceberg tables in Amazon S3 (see https://iceberg.apache.org/)
/// Right now it's implemented on top of StorageS3 and right now it doesn't support
/// many Iceberg features like schema evolution, partitioning, positional and equality deletes.
template <typename DataLakeMetadata>
class IStorageDataLake final : public StorageObjectStorage
{
public:
using Storage = StorageObjectStorage;
using ConfigurationPtr = Storage::ConfigurationPtr;
static StoragePtr create(
ConfigurationPtr base_configuration,
ContextPtr context,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment_,
std::optional<FormatSettings> format_settings_,
LoadingStrictnessLevel mode)
{
auto object_storage = base_configuration->createObjectStorage(context, /* is_readonly */true);
DataLakeMetadataPtr metadata;
NamesAndTypesList schema_from_metadata;
const bool use_schema_from_metadata = columns_.empty();
if (base_configuration->format == "auto")
base_configuration->format = "Parquet";
ConfigurationPtr configuration = base_configuration->clone();
try
{
metadata = DataLakeMetadata::create(object_storage, base_configuration, context);
configuration->setPaths(metadata->getDataFiles());
if (use_schema_from_metadata)
schema_from_metadata = metadata->getTableSchema();
}
catch (...)
{
if (mode <= LoadingStrictnessLevel::CREATE)
throw;
metadata.reset();
configuration->setPaths({});
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return std::make_shared<IStorageDataLake<DataLakeMetadata>>(
base_configuration, std::move(metadata), configuration, object_storage,
context, table_id_,
use_schema_from_metadata ? ColumnsDescription(schema_from_metadata) : columns_,
constraints_, comment_, format_settings_);
}
String getName() const override { return DataLakeMetadata::name; }
static ColumnsDescription getTableStructureFromData(
ObjectStoragePtr object_storage_,
ConfigurationPtr base_configuration,
const std::optional<FormatSettings> & format_settings_,
ContextPtr local_context)
{
auto metadata = DataLakeMetadata::create(object_storage_, base_configuration, local_context);
auto schema_from_metadata = metadata->getTableSchema();
if (!schema_from_metadata.empty())
{
return ColumnsDescription(std::move(schema_from_metadata));
}
ConfigurationPtr configuration = base_configuration->clone();
configuration->setPaths(metadata->getDataFiles());
std::string sample_path;
return Storage::resolveSchemaFromData(object_storage_, configuration, format_settings_, sample_path, local_context);
}
void updateConfiguration(ContextPtr local_context) override
{
Storage::updateConfiguration(local_context);
auto new_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context);
if (current_metadata && *current_metadata == *new_metadata)
return;
current_metadata = std::move(new_metadata);
auto updated_configuration = base_configuration->clone();
updated_configuration->setPaths(current_metadata->getDataFiles());
updated_configuration->setPartitionColumns(current_metadata->getPartitionColumns());
Storage::configuration = updated_configuration;
}
template <typename... Args>
IStorageDataLake(
ConfigurationPtr base_configuration_,
DataLakeMetadataPtr metadata_,
Args &&... args)
: Storage(std::forward<Args>(args)...)
, base_configuration(base_configuration_)
, current_metadata(std::move(metadata_))
{
if (base_configuration->format == "auto")
{
base_configuration->format = Storage::configuration->format;
}
if (current_metadata)
{
const auto & columns = current_metadata->getPartitionColumns();
base_configuration->setPartitionColumns(columns);
Storage::configuration->setPartitionColumns(columns);
}
}
private:
ConfigurationPtr base_configuration;
DataLakeMetadataPtr current_metadata;
ReadFromFormatInfo prepareReadingFromFormat(
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr local_context) override
{
auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns);
if (!current_metadata)
{
Storage::updateConfiguration(local_context);
current_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context);
}
auto column_mapping = current_metadata->getColumnNameToPhysicalNameMapping();
if (!column_mapping.empty())
{
for (const auto & [column_name, physical_name] : column_mapping)
{
auto & column = info.format_header.getByName(column_name);
column.name = physical_name;
}
}
return info;
}
};
using StorageIceberg = IStorageDataLake<IcebergMetadata>;
using StorageDeltaLake = IStorageDataLake<DeltaLakeMetadata>;
using StorageHudi = IStorageDataLake<HudiMetadata>;
}
#endif

View File

@ -51,7 +51,7 @@ extern const int UNSUPPORTED_METHOD;
IcebergMetadata::IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ConfigurationObserverPtr configuration_,
DB::ContextPtr context_,
Int32 metadata_version_,
Int32 format_version_,
@ -382,12 +382,12 @@ std::pair<Int32, String> getMetadataFileAndVersion(
}
DataLakeMetadataPtr IcebergMetadata::create(
ObjectStoragePtr object_storage,
ConfigurationPtr configuration,
ContextPtr local_context)
DataLakeMetadataPtr
IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context)
{
const auto [metadata_version, metadata_file_path] = getMetadataFileAndVersion(object_storage, *configuration);
auto configuration_ptr = configuration.lock();
const auto [metadata_version, metadata_file_path] = getMetadataFileAndVersion(object_storage, *configuration_ptr);
auto log = getLogger("IcebergMetadata");
LOG_DEBUG(log, "Parse metadata {}", metadata_file_path);
@ -416,12 +416,13 @@ DataLakeMetadataPtr IcebergMetadata::create(
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
{
const auto path = snapshot->getValue<String>("manifest-list");
manifest_list_file = std::filesystem::path(configuration->getPath()) / "metadata" / std::filesystem::path(path).filename();
manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename();
break;
}
}
return std::make_unique<IcebergMetadata>(object_storage, configuration, local_context, metadata_version, format_version, manifest_list_file, schema_id, schema);
return std::make_unique<IcebergMetadata>(
object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, schema);
}
/**
@ -451,6 +452,7 @@ DataLakeMetadataPtr IcebergMetadata::create(
*/
Strings IcebergMetadata::getDataFiles() const
{
auto configuration_ptr = configuration.lock();
if (!data_files.empty())
return data_files;
@ -483,7 +485,7 @@ Strings IcebergMetadata::getDataFiles() const
{
const auto file_path = col_str->getDataAt(i).toView();
const auto filename = std::filesystem::path(file_path).filename();
manifest_files.emplace_back(std::filesystem::path(configuration->getPath()) / "metadata" / filename);
manifest_files.emplace_back(std::filesystem::path(configuration_ptr->getPath()) / "metadata" / filename);
}
NameSet files;
@ -618,9 +620,9 @@ Strings IcebergMetadata::getDataFiles() const
const auto status = status_int_column->getInt(i);
const auto data_path = std::string(file_path_string_column->getDataAt(i).toView());
const auto pos = data_path.find(configuration->getPath());
const auto pos = data_path.find(configuration_ptr->getPath());
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration->getPath(), data_path);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration_ptr->getPath(), data_path);
const auto file_path = data_path.substr(pos);

View File

@ -1,5 +1,7 @@
#pragma once
#include "config.h"
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
#include <Interpreters/Context_fwd.h>
@ -61,13 +63,13 @@ namespace DB
class IcebergMetadata : public IDataLakeMetadata, private WithContext
{
public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
static constexpr auto name = "Iceberg";
IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ConfigurationObserverPtr configuration_,
ContextPtr context_,
Int32 metadata_version_,
Int32 format_version_,
@ -92,16 +94,13 @@ public:
return iceberg_metadata && getVersion() == iceberg_metadata->getVersion();
}
static DataLakeMetadataPtr create(
ObjectStoragePtr object_storage,
ConfigurationPtr configuration,
ContextPtr local_context);
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
private:
size_t getVersion() const { return metadata_version; }
const ObjectStoragePtr object_storage;
const ConfigurationPtr configuration;
const ConfigurationObserverPtr configuration;
Int32 metadata_version;
Int32 format_version;
String manifest_list_file;

View File

@ -1,154 +0,0 @@
#include "config.h"
#if USE_AWS_S3
# include <Storages/ObjectStorage/Azure/Configuration.h>
# include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
# include <Storages/ObjectStorage/DataLakes/IStorageDataLake.h>
# include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
# include <Storages/ObjectStorage/Local/Configuration.h>
# include <Storages/ObjectStorage/S3/Configuration.h>
#if USE_HDFS
# include <Storages/ObjectStorage/HDFS/Configuration.h>
#endif
namespace DB
{
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
void registerStorageIceberg(StorageFactory & factory)
{
factory.registerStorage(
"Iceberg",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergS3",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergAzure",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageAzureConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), true);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::AZURE,
});
factory.registerStorage(
"IcebergLocal",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageLocalConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::FILE,
});
#if USE_HDFS
factory.registerStorage(
"IcebergHDFS",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageHDFSConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::HDFS,
});
#endif
}
#endif
#if USE_PARQUET
void registerStorageDeltaLake(StorageFactory & factory)
{
factory.registerStorage(
"DeltaLake",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageDeltaLake::create(
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
#endif
void registerStorageHudi(StorageFactory & factory)
{
factory.registerStorage(
"Hudi",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageHudi::create(
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
}
#endif

View File

@ -14,14 +14,16 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Storages/StorageFactory.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/ObjectStorage/ReadBufferIterator.h>
#include <Storages/ObjectStorage/StorageObjectStorageSink.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/ReadBufferIterator.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Storages/StorageFactory.h>
#include <Storages/VirtualColumnUtils.h>
#include "Databases/LoadingStrictnessLevel.h"
#include "Storages/ColumnsDescription.h"
namespace DB
@ -76,6 +78,7 @@ StorageObjectStorage::StorageObjectStorage(
const ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_,
LoadingStrictnessLevel mode,
bool distributed_processing_,
ASTPtr partition_by_)
: IStorage(table_id_)
@ -86,9 +89,25 @@ StorageObjectStorage::StorageObjectStorage(
, distributed_processing(distributed_processing_)
, log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName())))
{
ColumnsDescription columns{columns_};
try
{
configuration->update(object_storage, context);
}
catch (...)
{
// If we don't have format or schema yet, we can't ignore failed configuration update, because relevant configuration is crucial for format and schema inference
if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->format == "auto"))
{
throw;
}
else
{
tryLogCurrentException(log);
}
}
std::string sample_path;
ColumnsDescription columns{columns_};
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context);
configuration->check(context);
@ -124,12 +143,11 @@ bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) c
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context, format_settings);
}
void StorageObjectStorage::updateConfiguration(ContextPtr context)
void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage_ptr, ContextPtr context)
{
IObjectStorage::ApplyNewSettingsOptions options{ .allow_client_change = !configuration->isStaticConfiguration() };
object_storage->applyNewSettings(context->getConfigRef(), configuration->getTypeName() + ".", context, options);
IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()};
object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options);
}
namespace
{
class ReadFromObjectStorageStep : public SourceStepWithFilter
@ -243,7 +261,8 @@ private:
};
}
ReadFromFormatInfo StorageObjectStorage::prepareReadingFromFormat(
ReadFromFormatInfo StorageObjectStorage::Configuration::prepareReadingFromFormat(
ObjectStoragePtr,
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
@ -252,6 +271,11 @@ ReadFromFormatInfo StorageObjectStorage::prepareReadingFromFormat(
return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns);
}
std::optional<ColumnsDescription> StorageObjectStorage::Configuration::tryGetTableStructureFromMetadata() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetTableStructureFromMetadata is not implemented for basic configuration");
}
void StorageObjectStorage::read(
QueryPlan & query_plan,
const Names & column_names,
@ -262,7 +286,7 @@ void StorageObjectStorage::read(
size_t max_block_size,
size_t num_streams)
{
updateConfiguration(local_context);
configuration->update(object_storage, local_context);
if (partition_by && configuration->withPartitionWildcard())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
@ -270,8 +294,8 @@ void StorageObjectStorage::read(
getName());
}
const auto read_from_format_info = prepareReadingFromFormat(
column_names, storage_snapshot, supportsSubsetOfColumns(local_context), local_context);
const auto read_from_format_info = configuration->prepareReadingFromFormat(
object_storage, column_names, storage_snapshot, supportsSubsetOfColumns(local_context), local_context);
const bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef()[Setting::optimize_count_from_files];
@ -300,7 +324,7 @@ SinkToStoragePtr StorageObjectStorage::write(
ContextPtr local_context,
bool /* async_insert */)
{
updateConfiguration(local_context);
configuration->update(object_storage, local_context);
const auto sample_block = metadata_snapshot->getSampleBlock();
const auto & settings = configuration->getQuerySettings(local_context);
@ -409,6 +433,16 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(
std::string & sample_path,
const ContextPtr & context)
{
if (configuration->isDataLakeConfiguration())
{
configuration->update(object_storage, context);
auto table_structure = configuration->tryGetTableStructureFromMetadata();
if (table_structure)
{
return table_structure.value();
}
}
ObjectInfos read_keys;
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
auto schema = readSchemaFromFormat(configuration->format, format_settings, *iterator, context);
@ -489,10 +523,17 @@ void StorageObjectStorage::Configuration::initialize(
if (configuration.format == "auto")
{
configuration.format = FormatFactory::instance().tryGetFormatFromFileName(
configuration.isArchive()
? configuration.getPathInArchive()
: configuration.getPath()).value_or("auto");
if (configuration.isDataLakeConfiguration())
{
configuration.format = "Parquet";
}
else
{
configuration.format
= FormatFactory::instance()
.tryGetFormatFromFileName(configuration.isArchive() ? configuration.getPathInArchive() : configuration.getPath())
.value_or("auto");
}
}
else
FormatFactory::instance().checkFormatName(configuration.format);

View File

@ -1,12 +1,13 @@
#pragma once
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Core/SchemaInferenceMode.h>
#include <Storages/IStorage.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <Storages/IStorage.h>
#include <Storages/ObjectStorage/DataLakes/PartitionColumns.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Common/threadPoolCallbackRunner.h>
#include "Storages/ColumnsDescription.h"
namespace DB
{
@ -25,6 +26,7 @@ class StorageObjectStorage : public IStorage
public:
class Configuration;
using ConfigurationPtr = std::shared_ptr<Configuration>;
using ConfigurationObserverPtr = std::weak_ptr<Configuration>;
using ObjectInfo = RelativePathWithMetadata;
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
using ObjectInfos = std::vector<ObjectInfoPtr>;
@ -55,6 +57,7 @@ public:
const ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_,
LoadingStrictnessLevel mode,
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr);
@ -120,16 +123,8 @@ public:
const ContextPtr & context);
protected:
virtual void updateConfiguration(ContextPtr local_context);
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
virtual ReadFromFormatInfo prepareReadingFromFormat(
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr local_context);
static std::unique_ptr<ReadBufferIterator> createReadBufferIterator(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
@ -207,14 +202,29 @@ public:
void setPartitionColumns(const DataLakePartitionColumns & columns) { partition_columns = columns; }
const DataLakePartitionColumns & getPartitionColumns() const { return partition_columns; }
virtual bool isDataLakeConfiguration() const { return false; }
virtual ReadFromFormatInfo prepareReadingFromFormat(
ObjectStoragePtr object_storage,
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr local_context);
virtual std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const;
String format = "auto";
String compression_method = "auto";
String structure = "auto";
virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);
protected:
virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0;
virtual void fromAST(ASTs & args, ContextPtr context, bool with_structure) = 0;
void assertInitialized() const;
bool initialized = false;

View File

@ -2,6 +2,7 @@
#include <Core/Settings.h>
#include <Formats/FormatFactory.h>
#include <Storages/ObjectStorage/Azure/Configuration.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
@ -10,17 +11,19 @@
namespace DB
{
#if USE_AWS_S3 || USE_AZURE_BLOB_STORAGE || USE_HDFS
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
static std::shared_ptr<StorageObjectStorage> createStorageObjectStorage(
const StorageFactory::Arguments & args,
StorageObjectStorage::ConfigurationPtr configuration,
ContextPtr context)
namespace
{
// LocalObjectStorage is only supported for Iceberg Datalake operations where Avro format is required. For regular file access, use FileStorage instead.
#if USE_AWS_S3 || USE_AZURE_BLOB_STORAGE || USE_HDFS || USE_AVRO
std::shared_ptr<StorageObjectStorage>
createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObjectStorage::ConfigurationPtr configuration, ContextPtr context)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
@ -52,18 +55,20 @@ static std::shared_ptr<StorageObjectStorage> createStorageObjectStorage(
return std::make_shared<StorageObjectStorage>(
configuration,
configuration->createObjectStorage(context, /* is_readonly */false),
configuration->createObjectStorage(context, /* is_readonly */ false),
args.getContext(),
args.table_id,
args.columns,
args.constraints,
args.comment,
format_settings,
args.mode,
/* distributed_processing */ false,
partition_by);
}
#endif
}
#if USE_AZURE_BLOB_STORAGE
void registerStorageAzure(StorageFactory & factory)
@ -148,4 +153,133 @@ void registerStorageObjectStorage(StorageFactory & factory)
UNUSED(factory);
}
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
void registerStorageIceberg(StorageFactory & factory)
{
#if USE_AWS_S3
factory.registerStorage(
"Iceberg",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3IcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergS3",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3IcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
#endif
#if USE_AZURE_BLOB_STORAGE
factory.registerStorage(
"IcebergAzure",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageAzureIcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), true);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::AZURE,
});
#endif
#if USE_HDFS
factory.registerStorage(
"IcebergHDFS",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageHDFSIcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::HDFS,
});
#endif
factory.registerStorage(
"IcebergLocal",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageLocalIcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::FILE,
});
}
#endif
#if USE_PARQUET
void registerStorageDeltaLake(StorageFactory & factory)
{
#if USE_AWS_S3
factory.registerStorage(
"DeltaLake",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3DeltaLakeConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
#endif
UNUSED(factory);
}
#endif
void registerStorageHudi(StorageFactory & factory)
{
#if USE_AWS_S3
factory.registerStorage(
"Hudi",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3HudiConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
#endif
UNUSED(factory);
}
}

View File

@ -41,10 +41,11 @@ void registerStorageS3Queue(StorageFactory & factory);
#if USE_PARQUET
void registerStorageDeltaLake(StorageFactory & factory);
#endif
#endif
#if USE_AVRO
void registerStorageIceberg(StorageFactory & factory);
#endif
#endif
#if USE_AZURE_BLOB_STORAGE
void registerStorageAzureQueue(StorageFactory & factory);
@ -140,6 +141,10 @@ void registerStorages(bool use_legacy_mongodb_integration [[maybe_unused]])
registerStorageAzureQueue(factory);
#endif
#if USE_AVRO
registerStorageIceberg(factory);
#endif
#if USE_AWS_S3
registerStorageHudi(factory);
registerStorageS3Queue(factory);
@ -148,14 +153,10 @@ void registerStorages(bool use_legacy_mongodb_integration [[maybe_unused]])
registerStorageDeltaLake(factory);
#endif
#if USE_AVRO
registerStorageIceberg(factory);
#endif
#endif
#endif
#if USE_HDFS
#if USE_HIVE
#if USE_HDFS
# if USE_HIVE
registerStorageHive(factory);
#endif
#endif

View File

@ -1,126 +0,0 @@
#pragma once
#include "config.h"
#include <Access/Common/AccessFlags.h>
#include <Interpreters/Context.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/DataLakes/IStorageDataLake.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
{
template <typename Name, typename Storage, typename TableFunction>
class ITableFunctionDataLake : public TableFunction
{
public:
static constexpr auto name = Name::name;
std::string getName() const override { return name; }
protected:
StoragePtr executeImpl(
const ASTPtr & /* ast_function */,
ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns,
bool /*is_insert_query*/) const override
{
ColumnsDescription columns;
auto configuration = TableFunction::getConfiguration();
if (configuration->structure != "auto")
columns = parseColumnsListFromString(configuration->structure, context);
else if (!cached_columns.empty())
columns = cached_columns;
StoragePtr storage = Storage::create(
configuration, context, StorageID(TableFunction::getDatabaseName(), table_name),
columns, ConstraintsDescription{}, String{}, std::nullopt, LoadingStrictnessLevel::CREATE);
storage->startup();
return storage;
}
const char * getStorageTypeName() const override { return name; }
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override
{
auto configuration = TableFunction::getConfiguration();
if (configuration->structure == "auto")
{
context->checkAccess(TableFunction::getSourceAccessType());
auto object_storage = TableFunction::getObjectStorage(context, !is_insert_query);
return Storage::getTableStructureFromData(object_storage, configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration->structure, context);
}
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override
{
auto configuration = TableFunction::getConfiguration();
configuration->format = "Parquet";
/// Set default format to Parquet if it's not specified in arguments.
TableFunction::parseArguments(ast_function, context);
}
};
struct TableFunctionIcebergName
{
static constexpr auto name = "iceberg";
};
struct TableFunctionIcebergS3Name
{
static constexpr auto name = "icebergS3";
};
struct TableFunctionIcebergAzureName
{
static constexpr auto name = "icebergAzure";
};
struct TableFunctionIcebergLocalName
{
static constexpr auto name = "icebergLocal";
};
struct TableFunctionIcebergHDFSName
{
static constexpr auto name = "icebergHDFS";
};
struct TableFunctionDeltaLakeName
{
static constexpr auto name = "deltaLake";
};
struct TableFunctionHudiName
{
static constexpr auto name = "hudi";
};
#if USE_AVRO
# if USE_AWS_S3
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIceberg, TableFunctionS3>;
using TableFunctionIcebergS3 = ITableFunctionDataLake<TableFunctionIcebergS3Name, StorageIceberg, TableFunctionS3>;
# endif
# if USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzure = ITableFunctionDataLake<TableFunctionIcebergAzureName, StorageIceberg, TableFunctionAzureBlob>;
# endif
using TableFunctionIcebergLocal = ITableFunctionDataLake<TableFunctionIcebergLocalName, StorageIceberg, TableFunctionLocal>;
#if USE_HDFS
using TableFunctionIcebergHDFS = ITableFunctionDataLake<TableFunctionIcebergHDFSName, StorageIceberg, TableFunctionHDFS>;
#endif
#endif
#if USE_AWS_S3
# if USE_PARQUET
using TableFunctionDeltaLake = ITableFunctionDataLake<TableFunctionDeltaLakeName, StorageDeltaLake, TableFunctionS3>;
#endif
using TableFunctionHudi = ITableFunctionDataLake<TableFunctionHudiName, StorageHudi, TableFunctionS3>;
#endif
}

View File

@ -15,7 +15,7 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/ColumnsDescription.h>
#include <TableFunctions/TableFunctionMongoDB.h>
namespace DB
{
@ -85,17 +85,11 @@ void TableFunctionMongoDB::parseArguments(const ASTPtr & ast_function, ContextPt
{
if (const auto * ast_func = typeid_cast<const ASTFunction *>(args[i].get()))
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_func->arguments.get());
auto function_args = args_expr->children;
if (function_args.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
const auto & [arg_name, arg_value] = getKeyValueMongoDBArgument(ast_func);
if (arg_name == "structure")
structure = checkAndGetLiteralArgument<String>(function_args[1], "structure");
structure = checkAndGetLiteralArgument<String>(arg_value, arg_name);
else if (arg_name == "options")
main_arguments.push_back(function_args[1]);
main_arguments.push_back(arg_value);
}
else if (i == 5)
{
@ -117,15 +111,11 @@ void TableFunctionMongoDB::parseArguments(const ASTPtr & ast_function, ContextPt
{
if (const auto * ast_func = typeid_cast<const ASTFunction *>(args[i].get()))
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_func->arguments.get());
auto function_args = args_expr->children;
if (function_args.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
const auto & [arg_name, arg_value] = getKeyValueMongoDBArgument(ast_func);
if (arg_name == "structure")
structure = checkAndGetLiteralArgument<String>(function_args[1], "structure");
structure = checkAndGetLiteralArgument<String>(arg_value, arg_name);
else if (arg_name == "options")
main_arguments.push_back(arg_value);
}
else if (i == 2)
{
@ -145,6 +135,20 @@ void TableFunctionMongoDB::parseArguments(const ASTPtr & ast_function, ContextPt
}
std::pair<String, ASTPtr> getKeyValueMongoDBArgument(const ASTFunction * ast_func)
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_func->arguments.get());
const auto & function_args = args_expr->children;
if (function_args.size() != 2 || ast_func->name != "equals" || !function_args[0]->as<ASTIdentifier>())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument, got {}", ast_func->formatForErrorMessage());
const auto & arg_name = function_args[0]->as<ASTIdentifier>()->name();
if (arg_name == "structure" || arg_name == "options")
return std::make_pair(arg_name, function_args[1]);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument, got {}", ast_func->formatForErrorMessage());
}
void registerTableFunctionMongoDB(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionMongoDB>(

View File

@ -0,0 +1,16 @@
#pragma once
#include <Common/Exception.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/checkAndGetLiteralArgument.h>
namespace DB
{
std::pair<String, ASTPtr> getKeyValueMongoDBArgument(const ASTFunction * ast_func);
}

View File

@ -15,6 +15,7 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/ColumnsDescription.h>
#include <TableFunctions/TableFunctionMongoDB.h>
namespace DB
@ -97,17 +98,11 @@ void TableFunctionMongoDBPocoLegacy::parseArguments(const ASTPtr & ast_function,
{
if (const auto * ast_func = typeid_cast<const ASTFunction *>(args[i].get()))
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_func->arguments.get());
auto function_args = args_expr->children;
if (function_args.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
const auto & [arg_name, arg_value] = getKeyValueMongoDBArgument(ast_func);
if (arg_name == "structure")
structure = checkAndGetLiteralArgument<String>(function_args[1], "structure");
structure = checkAndGetLiteralArgument<String>(arg_value, "structure");
else if (arg_name == "options")
main_arguments.push_back(function_args[1]);
main_arguments.push_back(arg_value);
}
else if (i == 5)
{

View File

@ -117,8 +117,9 @@ StoragePtr TableFunctionObjectStorage<Definition, Configuration>::executeImpl(
columns,
ConstraintsDescription{},
String{},
/* format_settings */std::nullopt,
/* distributed_processing */false,
/* format_settings */ std::nullopt,
/* mode */ LoadingStrictnessLevel::CREATE,
/* distributed_processing */ false,
nullptr);
storage->startup();
@ -224,4 +225,87 @@ template class TableFunctionObjectStorage<HDFSDefinition, StorageHDFSConfigurati
template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{
#if USE_AWS_S3
factory.registerFunction<TableFunctionIceberg>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store. Alias to icebergS3)",
.examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
factory.registerFunction<TableFunctionIcebergS3>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)",
.examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_AZURE_BLOB_STORAGE
factory.registerFunction<TableFunctionIcebergAzure>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)",
.examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_HDFS
factory.registerFunction<TableFunctionIcebergHDFS>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem.)",
.examples{{"icebergHDFS", "SELECT * FROM icebergHDFS(url)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
factory.registerFunction<TableFunctionIcebergLocal>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored locally.)",
.examples{{"icebergLocal", "SELECT * FROM icebergLocal(filename)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
#if USE_AWS_S3
#if USE_PARQUET
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLake>(
{.documentation
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store.)",
.examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerTableFunctionHudi(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudi>(
{.documentation
= {.description = R"(The table function can be used to read the Hudi table stored on object store.)",
.examples{{"hudi", "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerDataLakeTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIceberg(factory);
#endif
#if USE_AWS_S3
#if USE_PARQUET
registerTableFunctionDeltaLake(factory);
#endif
registerTableFunctionHudi(factory);
#endif
}
}

View File

@ -2,6 +2,7 @@
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
#include <Formats/FormatFactory.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/VirtualColumnUtils.h>
#include <TableFunctions/ITableFunction.h>
@ -61,6 +62,48 @@ struct LocalDefinition
static constexpr auto storage_type_name = "Local";
};
struct IcebergDefinition
{
static constexpr auto name = "iceberg";
static constexpr auto storage_type_name = "S3";
};
struct IcebergS3Definition
{
static constexpr auto name = "icebergS3";
static constexpr auto storage_type_name = "S3";
};
struct IcebergAzureDefinition
{
static constexpr auto name = "icebergAzure";
static constexpr auto storage_type_name = "Azure";
};
struct IcebergLocalDefinition
{
static constexpr auto name = "icebergLocal";
static constexpr auto storage_type_name = "Local";
};
struct IcebergHDFSDefinition
{
static constexpr auto name = "icebergHDFS";
static constexpr auto storage_type_name = "HDFS";
};
struct DeltaLakeDefinition
{
static constexpr auto name = "deltaLake";
static constexpr auto storage_type_name = "S3";
};
struct HudiDefinition
{
static constexpr auto name = "hudi";
static constexpr auto storage_type_name = "S3";
};
template <typename Definition, typename Configuration>
class TableFunctionObjectStorage : public ITableFunction
{
@ -137,4 +180,25 @@ using TableFunctionHDFS = TableFunctionObjectStorage<HDFSDefinition, StorageHDFS
#endif
using TableFunctionLocal = TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
#if USE_AVRO
# if USE_AWS_S3
using TableFunctionIceberg = TableFunctionObjectStorage<IcebergDefinition, StorageS3IcebergConfiguration>;
using TableFunctionIcebergS3 = TableFunctionObjectStorage<IcebergS3Definition, StorageS3IcebergConfiguration>;
# endif
# if USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzure = TableFunctionObjectStorage<IcebergAzureDefinition, StorageAzureIcebergConfiguration>;
# endif
# if USE_HDFS
using TableFunctionIcebergHDFS = TableFunctionObjectStorage<IcebergHDFSDefinition, StorageHDFSIcebergConfiguration>;
# endif
using TableFunctionIcebergLocal = TableFunctionObjectStorage<IcebergLocalDefinition, StorageLocalIcebergConfiguration>;
#endif
#if USE_AWS_S3
# if USE_PARQUET
using TableFunctionDeltaLake = TableFunctionObjectStorage<DeltaLakeDefinition, StorageS3DeltaLakeConfiguration>;
# endif
using TableFunctionHudi = TableFunctionObjectStorage<HudiDefinition, StorageS3HudiConfiguration>;
#endif
}

View File

@ -41,9 +41,10 @@ StoragePtr TableFunctionObjectStorageCluster<Definition, Configuration>::execute
StorageID(Base::getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
/* comment */String{},
/* format_settings */std::nullopt, /// No format_settings
/* distributed_processing */true,
/* comment */ String{},
/* format_settings */ std::nullopt, /// No format_settings
/* mode */ LoadingStrictnessLevel::CREATE,
/* distributed_processing */ true,
/*partition_by_=*/nullptr);
}
else

View File

@ -1,96 +0,0 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/ITableFunctionDataLake.h>
namespace DB
{
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{
# if USE_AWS_S3
factory.registerFunction<TableFunctionIceberg>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store. Alias to icebergS3)",
.examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
factory.registerFunction<TableFunctionIcebergS3>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)",
.examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
# if USE_AZURE_BLOB_STORAGE
factory.registerFunction<TableFunctionIcebergAzure>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)",
.examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
# if USE_HDFS
factory.registerFunction<TableFunctionIcebergHDFS>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS object store.)",
.examples{{"icebergHDFS", "SELECT * FROM icebergHDFS(url)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
factory.registerFunction<TableFunctionIcebergLocal>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored locally.)",
.examples{{"icebergLocal", "SELECT * FROM icebergLocal(filename)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
#if USE_AWS_S3
# if USE_PARQUET
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLake>(
{
.documentation =
{
.description=R"(The table function can be used to read the DeltaLake table stored on object store.)",
.examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}
},
.allow_readonly = false
});
}
#endif
void registerTableFunctionHudi(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudi>(
{
.documentation =
{
.description=R"(The table function can be used to read the Hudi table stored on object store.)",
.examples{{"hudi", "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}
},
.allow_readonly = false
});
}
#endif
void registerDataLakeTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIceberg(factory);
#endif
#if USE_AWS_S3
# if USE_PARQUET
registerTableFunctionDeltaLake(factory);
#endif
registerTableFunctionHudi(factory);
#endif
}
}

View File

@ -1,4 +1,4 @@
-- Tags: no-fasttest, long, no-asan, no-asan, no-ubsan, no-debug
-- Tags: no-fasttest, long, no-asan, no-ubsan, no-debug
-- ^^ Disable test for slow builds: generating data takes time but a sufficiently large data set
-- is necessary for different hnsw_candidate_list_size_for_search settings to make a difference
@ -14,7 +14,7 @@ CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similar
-- Generate random values but with a fixed seed (conceptually), so that the data is deterministic.
-- Unfortunately, no random functions in ClickHouse accepts a seed. Instead, abuse the numbers table + hash functions to provide
-- deterministic randomness.
INSERT INTO tab SELECT number, [sipHash64(number)/18446744073709551615, wyHash64(number)/18446744073709551615] FROM numbers(370000); -- 18446744073709551615 is the biggest UInt64
INSERT INTO tab SELECT number, [sipHash64(number)/18446744073709551615, wyHash64(number)/18446744073709551615] FROM numbers(660000); -- 18446744073709551615 is the biggest UInt64
-- hnsw_candidate_list_size_for_search = 0 is illegal
WITH [0.5, 0.5] AS reference_vec

View File

@ -28,3 +28,19 @@ SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
simple (global) join with analyzer and parallel replicas with local plan
4200000 4200000 4200000 -1400000
4200006 4200006 4200006 -1400002
4200012 4200012 4200012 -1400004
4200018 4200018 4200018 -1400006
4200024 4200024 4200024 -1400008
4200030 4200030 4200030 -1400010
4200036 4200036 4200036 -1400012
4200042 4200042 4200042 -1400014
4200048 4200048 4200048 -1400016
4200054 4200054 4200054 -1400018
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done

View File

@ -27,6 +27,8 @@ inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1"
PARALLEL_REPLICAS_SETTINGS="enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0"
##############
echo
echo "simple (global) join with analyzer and parallel replicas"
@ -35,17 +37,31 @@ $CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1, allow_experimental_parallel_reading_from_replicas = 2,
max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=0"
SETTINGS enable_analyzer=1, $PARALLEL_REPLICAS_SETTINGS, parallel_replicas_local_plan=0"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1, allow_experimental_parallel_reading_from_replicas = 2, send_logs_level='trace',
max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=0" 2>&1 |
SETTINGS enable_analyzer=1, send_logs_level='trace', $PARALLEL_REPLICAS_SETTINGS, parallel_replicas_local_plan=0" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
echo
echo "simple (global) join with analyzer and parallel replicas with local plan"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS enable_analyzer=1, $PARALLEL_REPLICAS_SETTINGS, parallel_replicas_local_plan=1"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS enable_analyzer=1, send_logs_level='trace', $PARALLEL_REPLICAS_SETTINGS, parallel_replicas_local_plan=1" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'

View File

@ -11,7 +11,6 @@ simple (local) join with analyzer and parallel replicas
4200048 4200048 4200048 -1400016
4200054 4200054 4200054 -1400018
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
simple (local) join with analyzer and parallel replicas and full sorting merge join
@ -26,7 +25,6 @@ simple (local) join with analyzer and parallel replicas and full sorting merge j
4200048 4200048 4200048 -1400016
4200054 4200054 4200054 -1400018
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
nested join with analyzer
@ -53,5 +51,4 @@ nested join with analyzer and parallel replicas, both local
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4` ALL INNER JOIN (SELECT `__table6`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table6`) AS `__table5` ON `__table4`.`key` = `__table5`.`key` SETTINGS parallel_replicas_prefer_local_join = 1) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4` ALL INNER JOIN (SELECT `__table6`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table6`) AS `__table5` ON `__table4`.`key` = `__table5`.`key` SETTINGS parallel_replicas_prefer_local_join = 1) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done

View File

@ -17,6 +17,8 @@ insert into num_1 select number * 2, toString(number * 2) from numbers(1e7);
insert into num_2 select number * 3, -number from numbers(1.5e6);
"
PARALLEL_REPLICAS_SETTINGS="allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1, parallel_replicas_local_plan=1"
##############
echo
echo "simple (local) join with analyzer and parallel replicas"
@ -25,17 +27,13 @@ $CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1,
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1"
SETTINGS enable_analyzer=1, $PARALLEL_REPLICAS_SETTINGS"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1, send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1" 2>&1 |
SETTINGS enable_analyzer=1, send_logs_level='trace', $PARALLEL_REPLICAS_SETTINGS" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
@ -49,17 +47,13 @@ $CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1, join_algorithm='full_sorting_merge',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1"
SETTINGS enable_analyzer=1, join_algorithm='full_sorting_merge', $PARALLEL_REPLICAS_SETTINGS"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1, join_algorithm='full_sorting_merge', send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1" 2>&1 |
SETTINGS enable_analyzer=1, join_algorithm='full_sorting_merge', send_logs_level='trace', $PARALLEL_REPLICAS_SETTINGS" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
@ -74,7 +68,7 @@ select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings parallel_replicas_prefer_local_join=1) r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1"
SETTINGS enable_analyzer=1"
##############
@ -86,18 +80,14 @@ select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings parallel_replicas_prefer_local_join=1) r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1,
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1"
SETTINGS enable_analyzer=1, $PARALLEL_REPLICAS_SETTINGS"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings parallel_replicas_prefer_local_join=1) r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1, join_algorithm='full_sorting_merge', send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1" 2>&1 |
SETTINGS enable_analyzer=1, join_algorithm='full_sorting_merge', send_logs_level='trace', $PARALLEL_REPLICAS_SETTINGS" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'

View File

@ -0,0 +1,14 @@
-- Tags: no-fasttest
SELECT * FROM mongodb('mongodb://some-cluster:27017/?retryWrites=false', NULL, 'my_collection', 'test_user', 'password', 'x Int32'); -- { serverError BAD_ARGUMENTS }
SELECT * FROM mongodb('mongodb://some-cluster:27017/?retryWrites=false', 'test', NULL, 'test_user', 'password', 'x Int32'); -- { serverError BAD_ARGUMENTS }
SELECT * FROM mongodb('mongodb://some-cluster:27017/?retryWrites=false', 'test', 'my_collection', NULL, 'password', 'x Int32'); -- { serverError BAD_ARGUMENTS }
SELECT * FROM mongodb('mongodb://some-cluster:27017/?retryWrites=false', 'test', 'my_collection', 'test_user', NULL, 'x Int32'); -- { serverError BAD_ARGUMENTS }
SELECT * FROM mongodb('mongodb://some-cluster:27017/?retryWrites=false', 'test', 'my_collection', 'test_user', 'password', NULL); -- { serverError BAD_ARGUMENTS }
SELECT * FROM mongodb('mongodb://some-cluster:27017/?retryWrites=false', 'test', 'my_collection', 'test_user', 'password', materialize(1) + 1); -- { serverError BAD_ARGUMENTS }
SELECT * FROM mongodb('mongodb://some-cluster:27017/?retryWrites=false', 'test', 'my_collection', 'test_user', 'password', 'x Int32', NULL); -- { serverError BAD_ARGUMENTS }
SELECT * FROM mongodb('mongodb://some-cluster:27017/?retryWrites=false', 'test', 'my_collection', 'test_user', 'password', NULL, 'x Int32'); -- { serverError BAD_ARGUMENTS }
SELECT * FROM mongodb('mongodb://some-cluster:27017/?retryWrites=false', 'test', 'my_collection', 'test_user', 'password', NULL, 'x Int32'); -- { serverError BAD_ARGUMENTS }
SELECT * FROM mongodb(NULL, 'test', 'my_collection', 'test_user', 'password', 'x Int32'); -- { serverError BAD_ARGUMENTS }
CREATE TABLE IF NOT EXISTS store_version ( `_id` String ) ENGINE = MongoDB(`localhost:27017`, mongodb, storeinfo, adminUser, adminUser); -- { serverError NAMED_COLLECTION_DOESNT_EXIST }