Compare commits

...

31 Commits

Author SHA1 Message Date
MikhailBurdukov
1b52d7e4b4
Merge 1cabe0f8be into 44b4bd38b9 2024-11-21 00:09:45 +01:00
Mikhail Artemenko
44b4bd38b9
Merge pull request #72045 from ClickHouse/issues/70174/cluster_versions
Enable cluster table functions for DataLake Storages
2024-11-20 21:22:37 +00:00
Shichao Jin
40c7d5fd1a
Merge pull request #71894 from udiz/fix-arrayWithConstant-size-estimation
Fix: arrayWithConstant size estimation using row's element size
2024-11-20 19:56:27 +00:00
MikhailBurdukov
1cabe0f8be Review 2024-11-20 15:37:30 +00:00
Mikhail Artemenko
4ccebd9a24 fix syntax for iceberg in docs 2024-11-20 11:15:39 +00:00
Mikhail Artemenko
99177c0daf remove icebergCluster alias 2024-11-20 11:15:12 +00:00
Mikhail Artemenko
0951991c1d update aspell-dict.txt 2024-11-19 13:10:42 +00:00
Mikhail Artemenko
19aec5e572 Merge branch 'issues/70174/cluster_versions' of github.com:ClickHouse/ClickHouse into issues/70174/cluster_versions 2024-11-19 12:51:56 +00:00
Mikhail Artemenko
a367de9977 add docs 2024-11-19 12:49:59 +00:00
Mikhail Artemenko
6894e280b2 fix pr issues 2024-11-19 12:34:42 +00:00
Mikhail Artemenko
39ebe113d9 Merge branch 'master' into issues/70174/cluster_versions 2024-11-19 11:28:46 +00:00
udiz
239bbaa133 use length 2024-11-19 00:00:43 +00:00
udiz
07fac5808d format null on test 2024-11-18 23:08:48 +00:00
udiz
ed95e0781f test uses less memory 2024-11-18 22:48:38 +00:00
robot-clickhouse
014608fb6b Automatic style fix 2024-11-18 17:51:51 +00:00
Mikhail Artemenko
a29ded4941 add test for iceberg 2024-11-18 17:39:46 +00:00
Mikhail Artemenko
d2efae7511 enable cluster versions for datalake storages 2024-11-18 17:35:21 +00:00
MikhailBurdukov
4f3dc60c93 Remove holder from config 2024-11-14 09:55:06 +00:00
udiz
6879aa130a newline 2024-11-13 22:47:54 +00:00
udiz
43f3c886a2 add test 2024-11-13 22:46:36 +00:00
udiz
c383a743f7 arrayWithConstant size estimation using single value size 2024-11-13 20:02:31 +00:00
MikhailBurdukov
f1dd6405ac Style 2024-11-07 18:54:24 +00:00
MikhailBurdukov
6edbd61e05 More review 2024-11-07 16:24:02 +00:00
MikhailBurdukov
39130c5ded Increase timeout 2024-11-07 10:53:38 +00:00
MikhailBurdukov
afc0251fd6 Review 2024-11-07 10:52:52 +00:00
MikhailBurdukov
16ef3d7511 Restart Ci 2024-11-06 10:10:41 +00:00
MikhailBurdukov
12354ce1a4 Fix tidy 2024-11-06 08:19:41 +00:00
MikhailBurdukov
429a6ad9bd Style 2024-11-05 18:41:47 +00:00
MikhailBurdukov
5d7d331091 Redesigned 2024-11-05 15:49:01 +00:00
MikhailBurdukov
9c77f61f8e vfork 2024-10-31 14:11:18 +00:00
MikhailBurdukov
d1d8c63e71 Fix zombie processes after library brigde crash. 2024-10-31 12:34:53 +00:00
31 changed files with 679 additions and 60 deletions

View File

@ -49,4 +49,4 @@ LIMIT 2
**See Also**
- [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
- [DeltaLake cluster table function](/docs/en/sql-reference/table-functions/deltalakeCluster.md)

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/table-functions/deltalakeCluster
sidebar_position: 46
sidebar_label: deltaLakeCluster
title: "deltaLakeCluster Table Function"
---
This is an extension to the [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
Allows processing files from [Delta Lake](https://github.com/delta-io/delta) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
deltaLakeCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Delta Lake table in S3.
**See Also**
- [deltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
- [deltaLake table function](/docs/en/sql-reference/table-functions/deltalake.md)

View File

@ -29,4 +29,4 @@ A table with the specified structure for reading data in the specified Hudi tabl
**See Also**
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
- [Hudi cluster table function](/docs/en/sql-reference/table-functions/hudiCluster.md)

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/table-functions/hudiCluster
sidebar_position: 86
sidebar_label: hudiCluster
title: "hudiCluster Table Function"
---
This is an extension to the [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
Allows processing files from Apache [Hudi](https://hudi.apache.org/) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
hudiCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Hudi table in S3.
**See Also**
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
- [Hudi table function](/docs/en/sql-reference/table-functions/hudi.md)

View File

@ -72,3 +72,4 @@ Table function `iceberg` is an alias to `icebergS3` now.
**See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
- [Iceberg cluster table function](/docs/en/sql-reference/table-functions/icebergCluster.md)

View File

@ -0,0 +1,43 @@
---
slug: /en/sql-reference/table-functions/icebergCluster
sidebar_position: 91
sidebar_label: icebergCluster
title: "icebergCluster Table Function"
---
This is an extension to the [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
icebergS3Cluster(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
icebergS3Cluster(cluster_name, named_collection[, option=value [,..]])
icebergAzureCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
icebergAzureCluster(cluster_name, named_collection[, option=value [,..]])
icebergHDFSCluster(cluster_name, path_to_table, [,format] [,compression_method])
icebergHDFSCluster(cluster_name, named_collection[, option=value [,..]])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Iceberg table.
**Examples**
```sql
SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
```
**See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
- [Iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md)

View File

@ -2,6 +2,7 @@
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <Common/ShellCommandsHolder.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
@ -29,7 +30,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_)
void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
{
getContext()->addBridgeCommand(std::move(cmd));
ShellCommandsHolder::instance().addCommand(std::move(cmd));
}

View File

@ -11,6 +11,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/BridgeProtocolVersion.h>
#include <Common/ShellCommand.h>
#include <Common/ShellCommandsHolder.h>
#include <IO/ConnectionTimeouts.h>
#include <base/range.h>
#include <BridgeHelper/IBridgeHelper.h>
@ -144,7 +145,7 @@ protected:
void startBridge(std::unique_ptr<ShellCommand> cmd) const override
{
getContext()->addBridgeCommand(std::move(cmd));
ShellCommandsHolder::instance().addCommand(std::move(cmd));
}

View File

@ -61,6 +61,9 @@ LoggerPtr ShellCommand::getLogger()
ShellCommand::~ShellCommand()
{
if (do_not_terminate)
return;
if (wait_called)
return;
@ -291,11 +294,48 @@ std::unique_ptr<ShellCommand> ShellCommand::executeDirect(const ShellCommand::Co
return executeImpl(path.data(), argv.data(), config);
}
struct ShellCommand::tryWaitResult
{
bool is_process_terminated = false;
int retcode = -1;
};
int ShellCommand::tryWait()
{
return tryWaitImpl(true).retcode;
}
ShellCommand::tryWaitResult ShellCommand::tryWaitImpl(bool blocking)
{
LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid);
ShellCommand::tryWaitResult result;
int options = ((!blocking) ? WNOHANG : 0);
int status = 0;
int waitpid_retcode = -1;
while (waitpid_retcode < 0)
{
waitpid_retcode = waitpid(pid, &status, options);
if (waitpid_retcode > 0)
{
break;
}
if (!blocking && !waitpid_retcode)
{
result.is_process_terminated = false;
return result;
}
if (errno != EINTR)
throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid");
}
LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status);
wait_called = true;
result.is_process_terminated = true;
in.close();
out.close();
err.close();
@ -306,19 +346,11 @@ int ShellCommand::tryWait()
for (auto & [_, fd] : read_fds)
fd.close();
LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid);
int status = 0;
while (waitpid(pid, &status, 0) < 0)
{
if (errno != EINTR)
throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid");
}
LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status);
if (WIFEXITED(status))
return WEXITSTATUS(status);
{
result.retcode = WEXITSTATUS(status);
return result;
}
if (WIFSIGNALED(status))
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status)));
@ -330,10 +362,8 @@ int ShellCommand::tryWait()
}
void ShellCommand::wait()
void ShellCommand::handleProcessRetcode(int retcode) const
{
int retcode = tryWait();
if (retcode != EXIT_SUCCESS)
{
switch (retcode)
@ -356,5 +386,22 @@ void ShellCommand::wait()
}
}
bool ShellCommand::waitIfProccesTerminated()
{
auto proc_status = tryWaitImpl(false);
if (proc_status.is_process_terminated)
{
handleProcessRetcode(proc_status.retcode);
}
return proc_status.is_process_terminated;
}
void ShellCommand::wait()
{
int retcode = tryWaitImpl(true).retcode;
handleProcessRetcode(retcode);
}
}

View File

@ -67,6 +67,21 @@ public:
DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false, 0);
};
pid_t getPid() const
{
return pid;
}
bool isWaitCalled() const
{
return wait_called;
}
void setDoNotTerminate()
{
do_not_terminate = true;
}
/// Run the command using /bin/sh -c.
/// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process.
static std::unique_ptr<ShellCommand> execute(const Config & config);
@ -81,6 +96,10 @@ public:
/// Wait for the process to finish, see the return code. To throw an exception if the process was not completed independently.
int tryWait();
/// Returns if process terminated.
/// If process terminated, then handle return code.
bool waitIfProccesTerminated();
WriteBufferFromFile in; /// If the command reads from stdin, do not forget to call in.close() after writing all the data there.
ReadBufferFromFile out;
ReadBufferFromFile err;
@ -92,10 +111,16 @@ private:
pid_t pid;
Config config;
bool wait_called = false;
bool do_not_terminate = false;
ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, const Config & config);
bool tryWaitProcessWithTimeout(size_t timeout_in_seconds);
struct tryWaitResult;
tryWaitResult tryWaitImpl(bool blocking);
void handleProcessRetcode(int retcode) const;
static LoggerPtr getLogger();

View File

@ -0,0 +1,53 @@
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <Common/ShellCommandsHolder.h>
namespace DB
{
ShellCommandsHolder & ShellCommandsHolder::instance()
{
static ShellCommandsHolder instance;
return instance;
}
void ShellCommandsHolder::removeCommand(pid_t pid)
{
std::lock_guard lock(mutex);
bool is_erased = shell_commands.erase(pid);
LOG_TRACE(log, "Try to erase command with the pid {}, is_erased: {}", pid, is_erased);
}
void ShellCommandsHolder::addCommand(std::unique_ptr<ShellCommand> command)
{
std::lock_guard lock(mutex);
pid_t command_pid = command->getPid();
if (command->waitIfProccesTerminated())
{
LOG_TRACE(log, "Pid {} already finished. Do not insert it.", command_pid);
return;
}
auto [iterator, is_inserted] = shell_commands.try_emplace(command_pid, std::move(command));
if (is_inserted)
{
LOG_TRACE(log, "Inserted the command with pid {}", command_pid);
return;
}
if (iterator->second->isWaitCalled())
{
iterator->second = std::move(command);
LOG_TRACE(log, "Replaced the command with pid {}", command_pid);
return;
}
/// We got two active ShellCommand with the same pid.
/// Probably it is a bug, will try to replace the old shell command with a new one.
chassert(false);
LOG_WARNING(log, "The PID already presented in active shell commands, will try to replace with a new one.");
iterator->second->setDoNotTerminate();
iterator->second = std::move(command);
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Common/ShellCommand.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <mutex>
#include <unordered_map>
namespace DB
{
/** The holder class for running background shell processes.
*/
class ShellCommandsHolder final : public boost::noncopyable
{
public:
static ShellCommandsHolder & instance();
void removeCommand(pid_t pid);
void addCommand(std::unique_ptr<ShellCommand> command);
private:
using ShellCommands = std::unordered_map<pid_t, std::unique_ptr<ShellCommand>>;
std::mutex mutex;
ShellCommands shell_commands TSA_GUARDED_BY(mutex);
LoggerPtr log = getLogger("ShellCommandsHolder");
};
}

View File

@ -1,6 +1,7 @@
#include <Common/SignalHandlers.h>
#include <Common/config_version.h>
#include <Common/getHashOfLoadedBinary.h>
#include <Common/ShellCommandsHolder.h>
#include <Common/CurrentThread.h>
#include <Daemon/BaseDaemon.h>
#include <Daemon/SentryWriter.h>
@ -68,6 +69,20 @@ void terminateRequestedSignalHandler(int sig, siginfo_t *, void *)
writeSignalIDtoSignalPipe(sig);
}
void childSignalHandler(int sig, siginfo_t * info, void *)
{
DENY_ALLOCATIONS_IN_SCOPE;
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
char buf[signal_pipe_buf_size];
auto & signal_pipe = HandledSignals::instance().signal_pipe;
WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
writeBinary(sig, out);
writeBinary(info->si_pid, out);
out.next();
errno = saved_errno;
}
void signalHandler(int sig, siginfo_t * info, void * context)
{
@ -294,6 +309,12 @@ void SignalListener::run()
if (daemon)
daemon->handleSignal(sig);
}
else if (sig == SIGCHLD)
{
pid_t child_pid = 0;
readBinary(child_pid, in);
ShellCommandsHolder::instance().removeCommand(child_pid);
}
else
{
siginfo_t info{};

View File

@ -33,6 +33,7 @@ void closeLogsSignalHandler(int sig, siginfo_t *, void *);
void terminateRequestedSignalHandler(int sig, siginfo_t *, void *);
void childSignalHandler(int sig, siginfo_t * info, void *);
/** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log.
*/

View File

@ -440,6 +440,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
HandledSignals::instance().setupCommonDeadlySignalHandlers();
HandledSignals::instance().setupCommonTerminateRequestSignalHandlers();
HandledSignals::instance().addSignalHandler({SIGHUP}, closeLogsSignalHandler, true);
HandledSignals::instance().addSignalHandler({SIGCHLD}, childSignalHandler, true);
/// Set up Poco ErrorHandler for Poco Threads.
static KillingErrorHandler killing_error_handler;

View File

@ -62,16 +62,17 @@ public:
for (size_t i = 0; i < num_rows; ++i)
{
auto array_size = col_num->getInt(i);
auto element_size = col_value->byteSizeAt(i);
if (unlikely(array_size < 0))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} cannot be negative: while executing function {}", array_size, getName());
Int64 estimated_size = 0;
if (unlikely(common::mulOverflow(array_size, col_value->byteSize(), estimated_size)))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
if (unlikely(common::mulOverflow(array_size, element_size, estimated_size)))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, element_size, getName());
if (unlikely(estimated_size > max_array_size_in_columns_bytes))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, element_size, getName());
offset += array_size;

View File

@ -540,9 +540,6 @@ struct ContextSharedPart : boost::noncopyable
/// No lock required for application_type modified only during initialization
Context::ApplicationType application_type = Context::ApplicationType::SERVER;
/// vector of xdbc-bridge commands, they will be killed when Context will be destroyed
std::vector<std::unique_ptr<ShellCommand>> bridge_commands TSA_GUARDED_BY(mutex);
/// No lock required for config_reload_callback, start_servers_callback, stop_servers_callback modified only during initialization
Context::ConfigReloadCallback config_reload_callback;
Context::StartStopServersCallback start_servers_callback;
@ -5067,12 +5064,6 @@ void Context::addQueryParameters(const NameToNameMap & parameters)
query_parameters.insert_or_assign(name, value);
}
void Context::addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const
{
std::lock_guard lock(shared->mutex);
shared->bridge_commands.emplace_back(std::move(cmd));
}
IHostContextPtr & Context::getHostContext()
{

View File

@ -1288,8 +1288,6 @@ public:
/// Overrides values of existing parameters.
void addQueryParameters(const NameToNameMap & parameters);
/// Add started bridge command. It will be killed after context destruction
void addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const;
IHostContextPtr & getHostContext();
const IHostContextPtr & getHostContext() const;

View File

@ -226,6 +226,26 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
#endif
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
#if USE_AVRO && USE_AWS_S3
template class TableFunctionObjectStorage<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorage<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
template class TableFunctionObjectStorage<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_PARQUET && USE_AWS_S3
template class TableFunctionObjectStorage<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
template class TableFunctionObjectStorage<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{

View File

@ -96,7 +96,7 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
{
.documentation = {
.description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)",
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster_name, uri, format)", ""}}},
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster, uri, format)", ""}}},
.allow_readonly = false
}
);
@ -105,15 +105,77 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
UNUSED(factory);
}
#if USE_AVRO
void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>;
factory.registerFunction<TableFunctionIcebergS3Cluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureConfiguration>;
factory.registerFunction<TableFunctionIcebergAzureCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_HDFS
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
factory.registerFunction<TableFunctionIcebergHDFSCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)",
.examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
}
#endif
#if USE_AWS_S3
#if USE_PARQUET
void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLakeCluster>(
{.documentation
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerTableFunctionHudiCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudiCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"hudiCluster", "SELECT * FROM hudiCluster(cluster, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIcebergCluster(factory);
#endif
#if USE_AWS_S3
#if USE_PARQUET
registerTableFunctionDeltaLakeCluster(factory);
#endif
registerTableFunctionHudiCluster(factory);
#endif
}
}

View File

@ -33,6 +33,36 @@ struct HDFSClusterDefinition
static constexpr auto storage_type_name = "HDFSCluster";
};
struct IcebergS3ClusterDefinition
{
static constexpr auto name = "icebergS3Cluster";
static constexpr auto storage_type_name = "IcebergS3Cluster";
};
struct IcebergAzureClusterDefinition
{
static constexpr auto name = "icebergAzureCluster";
static constexpr auto storage_type_name = "IcebergAzureCluster";
};
struct IcebergHDFSClusterDefinition
{
static constexpr auto name = "icebergHDFSCluster";
static constexpr auto storage_type_name = "IcebergHDFSCluster";
};
struct DeltaLakeClusterDefinition
{
static constexpr auto name = "deltaLakeCluster";
static constexpr auto storage_type_name = "DeltaLakeS3Cluster";
};
struct HudiClusterDefinition
{
static constexpr auto name = "hudiCluster";
static constexpr auto storage_type_name = "HudiS3Cluster";
};
/**
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
@ -79,4 +109,25 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClu
#if USE_HDFS
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif
#if USE_AVRO && USE_AWS_S3
using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_AWS_S3 && USE_PARQUET
using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
using TableFunctionHudiCluster = TableFunctionObjectStorageCluster<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
}

View File

@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
registerTableFunctionObjectStorage(factory);
registerTableFunctionObjectStorageCluster(factory);
registerDataLakeTableFunctions(factory);
registerDataLakeClusterTableFunctions(factory);
}
}

View File

@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
void registerTableFunctionTimeSeries(TableFunctionFactory & factory);

View File

@ -1657,6 +1657,7 @@ class ClickHouseCluster:
extra_configs=[],
extra_args="",
randomize_settings=True,
use_docker_init_flag=False,
) -> "ClickHouseInstance":
"""Add an instance to the cluster.
@ -1762,6 +1763,7 @@ class ClickHouseCluster:
config_root_name=config_root_name,
extra_configs=extra_configs,
randomize_settings=randomize_settings,
use_docker_init_flag=use_docker_init_flag,
)
docker_compose_yml_dir = get_docker_compose_path()
@ -3353,6 +3355,7 @@ services:
{ipv6_address}
{net_aliases}
{net_alias1}
init: {init_flag}
"""
@ -3419,6 +3422,7 @@ class ClickHouseInstance:
config_root_name="clickhouse",
extra_configs=[],
randomize_settings=True,
use_docker_init_flag=False,
):
self.name = name
self.base_cmd = cluster.base_cmd
@ -3545,6 +3549,7 @@ class ClickHouseInstance:
self.with_installed_binary = with_installed_binary
self.is_up = False
self.config_root_name = config_root_name
self.docker_init_flag = use_docker_init_flag
def is_built_with_sanitizer(self, sanitizer_name=""):
build_opts = self.query(
@ -4838,6 +4843,7 @@ class ClickHouseInstance:
ipv6_address=ipv6_address,
net_aliases=net_aliases,
net_alias1=net_alias1,
init_flag="true" if self.docker_init_flag else "false",
)
)

View File

@ -14,6 +14,11 @@ instance = cluster.add_instance(
dictionaries=["configs/dictionaries/dict1.xml"],
main_configs=["configs/config.d/config.xml"],
stay_alive=True,
# WA for the problem with zombie processes inside the docker container.
# This is important here because we are checking that there are no zombie processes
# after craches inside the library bridge.
# https://forums.docker.com/t/what-the-latest-with-the-zombie-process-reaping-problem/50758/2
use_docker_init_flag=True,
)
@ -34,6 +39,13 @@ def create_dict_simple(ch_instance):
)
def check_no_zombie_processes(instance):
res = instance.exec_in_container(
["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root"
)
assert res == "0\n"
@pytest.fixture(scope="module")
def ch_cluster():
try:
@ -263,6 +275,8 @@ def test_recover_after_bridge_crash(ch_cluster):
instance.exec_in_container(
["bash", "-c", "kill -9 `pidof clickhouse-library-bridge`"], user="root"
)
check_no_zombie_processes(instance)
instance.query("DROP DICTIONARY lib_dict_c")
@ -288,6 +302,8 @@ def test_server_restart_bridge_might_be_stil_alive(ch_cluster):
result = instance.query("""select dictGet(lib_dict_c, 'value1', toUInt64(1));""")
assert result.strip() == "101"
check_no_zombie_processes(instance)
instance.query("DROP DICTIONARY lib_dict_c")

View File

@ -71,13 +71,13 @@ def test_bridge_dies_with_parent(ch_cluster):
except:
pass
for i in range(30):
for i in range(60):
time.sleep(1)
clickhouse_pid = instance.get_process_pid("clickhouse server")
if clickhouse_pid is None:
break
for i in range(30):
for i in range(60):
time.sleep(1)
bridge_pid = instance.get_process_pid("library-bridge")
if bridge_pid is None:
@ -95,5 +95,5 @@ def test_bridge_dies_with_parent(ch_cluster):
assert clickhouse_pid is None
assert bridge_pid is None
finally:
instance.start_clickhouse(20)
instance.start_clickhouse(60)
instance.query("DROP DICTIONARY lib_dict_c")

View File

@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<cluster_simple>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</cluster_simple>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,6 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
</query_log>
</clickhouse>

View File

@ -73,14 +73,38 @@ def started_cluster():
cluster.add_instance(
"node1",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
with_minio=True,
with_azurite=True,
stay_alive=True,
with_hdfs=with_hdfs,
stay_alive=True,
)
cluster.add_instance(
"node2",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
)
cluster.add_instance(
"node3",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
)
logging.info("Starting cluster...")
@ -182,6 +206,7 @@ def get_creation_expression(
cluster,
format="Parquet",
table_function=False,
run_on_cluster=False,
**kwargs,
):
if storage_type == "s3":
@ -189,35 +214,56 @@ def get_creation_expression(
bucket = kwargs["bucket"]
else:
bucket = cluster.minio_bucket
print(bucket)
if table_function:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
if run_on_cluster:
assert table_function
return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
if table_function:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
elif storage_type == "azure":
if table_function:
if run_on_cluster:
assert table_function
return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
if table_function:
return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
elif storage_type == "hdfs":
if table_function:
if run_on_cluster:
assert table_function
return f"""
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
if table_function:
return f"""
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
elif storage_type == "local":
assert not run_on_cluster
if table_function:
return f"""
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
@ -227,6 +273,7 @@ def get_creation_expression(
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
else:
raise Exception(f"Unknown iceberg storage type: {storage_type}")
@ -492,6 +539,108 @@ def test_types(started_cluster, format_version, storage_type):
)
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"])
def test_cluster_table_function(started_cluster, format_version, storage_type):
if is_arm() and storage_type == "hdfs":
pytest.skip("Disabled test IcebergHDFS for aarch64")
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = (
"test_iceberg_cluster_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
def add_df(mode):
write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode=mode,
format_version=format_version,
)
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
logging.info(f"Adding another dataframe. result files: {files}")
return files
files = add_df(mode="overwrite")
for i in range(1, len(started_cluster.instances)):
files = add_df(mode="append")
logging.info(f"Setup complete. files: {files}")
assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1)
clusters = instance.query(f"SELECT * FROM system.clusters")
logging.info(f"Clusters setup: {clusters}")
# Regular Query only node1
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)
select_regular = (
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
)
# Cluster Query with node1 as coordinator
table_function_expr_cluster = get_creation_expression(
storage_type,
TABLE_NAME,
started_cluster,
table_function=True,
run_on_cluster=True,
)
select_cluster = (
instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
)
# Simple size check
assert len(select_regular) == 600
assert len(select_cluster) == 600
# Actual check
assert select_cluster == select_regular
# Check query_log
for replica in started_cluster.instances.values():
replica.query("SYSTEM FLUSH LOGS")
for node_name, replica in started_cluster.instances.items():
cluster_secondary_queries = (
replica.query(
f"""
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
WHERE
type = 'QueryStart' AND
positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND
position(query, '{TABLE_NAME}') != 0 AND
position(query, 'system.query_log') = 0 AND
NOT is_initial_query
"""
)
.strip()
.split("\n")
)
logging.info(
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
)
assert len(cluster_secondary_queries) == 1
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
def test_delete_files(started_cluster, format_version, storage_type):

View File

@ -1,3 +1,6 @@
SELECT arrayWithConstant(96142475, ['qMUF']); -- { serverError TOO_LARGE_ARRAY_SIZE }
SELECT arrayWithConstant(100000000, materialize([[[[[[[[[['Hello, world!']]]]]]]]]])); -- { serverError TOO_LARGE_ARRAY_SIZE }
SELECT length(arrayWithConstant(10000000, materialize([[[[[[[[[['Hello world']]]]]]]]]])));
CREATE TEMPORARY TABLE args (value Array(Int)) ENGINE=Memory AS SELECT [1, 1, 1, 1] as value FROM numbers(1, 100);
SELECT length(arrayWithConstant(1000000, value)) FROM args FORMAT NULL;

View File

@ -244,7 +244,10 @@ Deduplication
DefaultTableEngine
DelayedInserts
DeliveryTag
Deltalake
DeltaLake
deltalakeCluster
deltaLakeCluster
Denormalize
DestroyAggregatesThreads
DestroyAggregatesThreadsActive
@ -377,10 +380,15 @@ Homebrew's
HorizontalDivide
Hostname
HouseOps
hudi
Hudi
hudiCluster
HudiCluster
HyperLogLog
Hypot
IANA
icebergCluster
IcebergCluster
IDE
IDEs
IDNA