mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge branch 'master' into better-pipeline1
This commit is contained in:
commit
33115a55c3
@ -261,8 +261,8 @@ endif ()
|
||||
# Add a section with the hash of the compiled machine code for integrity checks.
|
||||
# Only for official builds, because adding a section can be time consuming (rewrite of several GB).
|
||||
# And cross compiled binaries are not supported (since you cannot execute clickhouse hash-binary)
|
||||
if (OBJCOPY_PATH AND CLICKHOUSE_OFFICIAL_BUILD AND (NOT CMAKE_TOOLCHAIN_FILE))
|
||||
set (USE_BINARY_HASH 1)
|
||||
if (OBJCOPY_PATH AND CLICKHOUSE_OFFICIAL_BUILD AND (NOT CMAKE_TOOLCHAIN_FILE OR CMAKE_TOOLCHAIN_FILE MATCHES "linux/toolchain-x86_64.cmake$"))
|
||||
set (USE_BINARY_HASH 1 CACHE STRING "Calculate binary hash and store it in the separate section")
|
||||
endif ()
|
||||
|
||||
# Allows to build stripped binary in a separate directory
|
||||
|
@ -2,6 +2,7 @@ set (SRCS
|
||||
argsToConfig.cpp
|
||||
coverage.cpp
|
||||
demangle.cpp
|
||||
getAvailableMemoryAmount.cpp
|
||||
getFQDNOrHostName.cpp
|
||||
getMemoryAmount.cpp
|
||||
getPageSize.cpp
|
||||
|
44
base/base/getAvailableMemoryAmount.cpp
Normal file
44
base/base/getAvailableMemoryAmount.cpp
Normal file
@ -0,0 +1,44 @@
|
||||
#include <stdexcept>
|
||||
#include <fstream>
|
||||
#include <base/getAvailableMemoryAmount.h>
|
||||
#include <base/getPageSize.h>
|
||||
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/param.h>
|
||||
#if defined(BSD)
|
||||
#include <sys/sysctl.h>
|
||||
#include <sys/vmmeter.h>
|
||||
#endif
|
||||
|
||||
|
||||
uint64_t getAvailableMemoryAmountOrZero()
|
||||
{
|
||||
#if defined(_SC_AVPHYS_PAGES) // linux
|
||||
return getPageSize() * sysconf(_SC_AVPHYS_PAGES);
|
||||
#elif defined(__FreeBSD__)
|
||||
struct vmtotal vmt;
|
||||
size_t vmt_size = sizeof(vmt);
|
||||
if (sysctlbyname("vm.vmtotal", &vmt, &vmt_size, NULL, 0) == 0)
|
||||
return getPageSize() * vmt.t_avm;
|
||||
else
|
||||
return 0;
|
||||
#else // darwin
|
||||
unsigned int usermem;
|
||||
size_t len = sizeof(usermem);
|
||||
static int mib[2] = { CTL_HW, HW_USERMEM };
|
||||
if (sysctl(mib, 2, &usermem, &len, nullptr, 0) == 0 && len == sizeof(usermem))
|
||||
return usermem;
|
||||
else
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
uint64_t getAvailableMemoryAmount()
|
||||
{
|
||||
auto res = getAvailableMemoryAmountOrZero();
|
||||
if (!res)
|
||||
throw std::runtime_error("Cannot determine available memory amount");
|
||||
return res;
|
||||
}
|
12
base/base/getAvailableMemoryAmount.h
Normal file
12
base/base/getAvailableMemoryAmount.h
Normal file
@ -0,0 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
/** Returns the size of currently available physical memory (RAM) in bytes.
|
||||
* Returns 0 on unsupported platform or if it cannot determine the size of physical memory.
|
||||
*/
|
||||
uint64_t getAvailableMemoryAmountOrZero();
|
||||
|
||||
/** Throws exception if it cannot determine the size of physical memory.
|
||||
*/
|
||||
uint64_t getAvailableMemoryAmount();
|
@ -131,8 +131,23 @@ clickhouse-client -q "system flush logs" ||:
|
||||
|
||||
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
|
||||
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz &
|
||||
clickhouse-client -q "select * from system.query_log format TSVWithNamesAndTypes" | pigz > /test_output/query-log.tsv.gz &
|
||||
clickhouse-client -q "select * from system.query_thread_log format TSVWithNamesAndTypes" | pigz > /test_output/query-thread-log.tsv.gz &
|
||||
|
||||
# Compress tables.
|
||||
#
|
||||
# NOTE:
|
||||
# - that due to tests with s3 storage we cannot use /var/lib/clickhouse/data
|
||||
# directly
|
||||
# - even though ci auto-compress some files (but not *.tsv) it does this only
|
||||
# for files >64MB, we want this files to be compressed explicitly
|
||||
for table in query_log zookeeper_log trace_log
|
||||
do
|
||||
clickhouse-client -q "select * from system.$table format TSVWithNamesAndTypes" | pigz > /test_output/$table.tsv.gz &
|
||||
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
|
||||
clickhouse-client --port 19000 -q "select * from system.$table format TSVWithNamesAndTypes" | pigz > /test_output/$table.1.tsv.gz &
|
||||
clickhouse-client --port 29000 -q "select * from system.$table format TSVWithNamesAndTypes" | pigz > /test_output/$table.2.tsv.gz &
|
||||
fi
|
||||
done
|
||||
wait ||:
|
||||
|
||||
# Also export trace log in flamegraph-friendly format.
|
||||
for trace_type in CPU Memory Real
|
||||
@ -161,14 +176,6 @@ fi
|
||||
|
||||
tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||:
|
||||
|
||||
# Replace the engine with Ordinary to avoid extra symlinks stuff in artifacts.
|
||||
# (so that clickhouse-local --path can read it w/o extra care).
|
||||
sed -i -e "s/ATTACH DATABASE _ UUID '[^']*'/ATTACH DATABASE system/" -e "s/Atomic/Ordinary/" /var/lib/clickhouse/metadata/system.sql
|
||||
for table in text_log query_log zookeeper_log trace_log; do
|
||||
sed -i "s/ATTACH TABLE _ UUID '[^']*'/ATTACH TABLE $table/" /var/lib/clickhouse/metadata/system/${table}.sql
|
||||
tar -chf /test_output/${table}_dump.tar /var/lib/clickhouse/metadata/system.sql /var/lib/clickhouse/metadata/system/${table}.sql /var/lib/clickhouse/data/system/${table} ||:
|
||||
done
|
||||
|
||||
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
|
||||
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server1.log ||:
|
||||
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server2.log ||:
|
||||
@ -179,8 +186,6 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
|
||||
rm /var/log/clickhouse-server/clickhouse-server2.log
|
||||
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:
|
||||
mv /var/log/clickhouse-server/stderr2.log /test_output/ ||:
|
||||
tar -chf /test_output/zookeeper_log_dump1.tar /var/lib/clickhouse1/data/system/zookeeper_log ||:
|
||||
tar -chf /test_output/zookeeper_log_dump2.tar /var/lib/clickhouse2/data/system/zookeeper_log ||:
|
||||
tar -chf /test_output/coordination1.tar /var/lib/clickhouse1/coordination ||:
|
||||
tar -chf /test_output/coordination2.tar /var/lib/clickhouse2/coordination ||:
|
||||
fi
|
||||
|
@ -688,7 +688,7 @@ Tags:
|
||||
- `volume_name_N` — Volume name. Volume names must be unique.
|
||||
- `disk` — a disk within a volume.
|
||||
- `max_data_part_size_bytes` — the maximum size of a part that can be stored on any of the volume’s disks. If the a size of a merged part estimated to be bigger than `max_data_part_size_bytes` then this part will be written to a next volume. Basically this feature allows to keep new/small parts on a hot (SSD) volume and move them to a cold (HDD) volume when they reach large size. Do not use this setting if your policy has only one volume.
|
||||
- `move_factor` — when the amount of available space gets lower than this factor, data automatically start to move on the next volume if any (by default, 0.1).
|
||||
- `move_factor` — when the amount of available space gets lower than this factor, data automatically starts to move on the next volume if any (by default, 0.1). ClickHouse sorts existing parts by size from largest to smallest (in descending order) and selects parts with the total size that is sufficient to meet the `move_factor` condition. If the total size of all parts is insufficient, all parts will be moved.
|
||||
- `prefer_not_to_merge` — Disables merging of data parts on this volume. When this setting is enabled, merging data on this volume is not allowed. This allows controlling how ClickHouse works with slow disks.
|
||||
|
||||
Cofiguration examples:
|
||||
|
@ -36,6 +36,7 @@ Example of configuration:
|
||||
<access_key_id>AKIAIOSFODNN7EXAMPLE</access_key_id>
|
||||
<secret_access_key> wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY</secret_access_key>
|
||||
<format>CSV</format>
|
||||
<url>https://s3.us-east-1.amazonaws.com/yourbucket/mydata/</url>
|
||||
</s3_mydata>
|
||||
</named_collections>
|
||||
</clickhouse>
|
||||
@ -44,12 +45,12 @@ Example of configuration:
|
||||
### Example of using named connections with the s3 function
|
||||
|
||||
```sql
|
||||
INSERT INTO FUNCTION s3(s3_mydata, url = 'https://s3.us-east-1.amazonaws.com/yourbucket/mydata/test_file.tsv.gz',
|
||||
INSERT INTO FUNCTION s3(s3_mydata, filename = 'test_file.tsv.gz',
|
||||
format = 'TSV', structure = 'number UInt64', compression_method = 'gzip')
|
||||
SELECT * FROM numbers(10000);
|
||||
|
||||
SELECT count()
|
||||
FROM s3(s3_mydata, url = 'https://s3.us-east-1.amazonaws.com/yourbucket/mydata/test_file.tsv.gz')
|
||||
FROM s3(s3_mydata, filename = 'test_file.tsv.gz')
|
||||
|
||||
┌─count()─┐
|
||||
│ 10000 │
|
||||
|
@ -273,7 +273,7 @@ GitHubのUIでforkリポジトリに移動します。 ブランチで開発し
|
||||
|
||||
プル要求は、作業がまだ完了していない場合でも作成できます。 この場合、単語を入れてください “WIP” (進行中の作業)タイトルの先頭に、それは後で変更することができます。 これは、変更の協調的なレビューと議論、および利用可能なすべてのテストの実行に役立ちます。 変更の簡単な説明を提供することが重要です。
|
||||
|
||||
Yandexの従業員がタグであなたのPRにラベルを付けるとすぐにテストが開始されます “can be tested”. The results of some first checks (e.g. code style) will come in within several minutes. Build check results will arrive within half an hour. And the main set of tests will report itself within an hour.
|
||||
ClickHouseの従業員がタグであなたのPRにラベルを付けるとすぐにテストが開始されます “can be tested”. The results of some first checks (e.g. code style) will come in within several minutes. Build check results will arrive within half an hour. And the main set of tests will report itself within an hour.
|
||||
|
||||
システムは、プル要求用にClickHouseバイナリビルドを個別に準備します。 これらのビルドを取得するには “Details” 次のリンク “ClickHouse build check” 小切手のリストのエントリ。 そこには、ビルドへの直接リンクがあります。ClickHouseのdebパッケージは、本番サーバーにも展開できます(恐れがない場合)。
|
||||
|
||||
|
@ -72,11 +72,11 @@ ClickHouse не работает и не собирается на 32-битны
|
||||
|
||||
Этот вариант не подходит для отправки изменений на сервер. Вы можете временно его использовать, а затем добавить ssh ключи и заменить адрес репозитория с помощью команды `git remote`.
|
||||
|
||||
Вы можете также добавить для своего локального репозитория адрес оригинального репозитория Яндекса, чтобы притягивать оттуда обновления:
|
||||
Вы можете также добавить для своего локального репозитория адрес оригинального репозитория, чтобы притягивать оттуда обновления:
|
||||
|
||||
git remote add upstream git@github.com:ClickHouse/ClickHouse.git
|
||||
|
||||
После этого, вы сможете добавлять в свой репозиторий обновления из репозитория Яндекса с помощью команды `git pull upstream master`.
|
||||
После этого, вы сможете добавлять в свой репозиторий обновления из репозитория ClickHouse с помощью команды `git pull upstream master`.
|
||||
|
||||
### Работа с сабмодулями Git {#rabota-s-sabmoduliami-git}
|
||||
|
||||
@ -288,7 +288,7 @@ sudo ./llvm.sh 12
|
||||
|
||||
Pull request можно создать, даже если работа над задачей ещё не завершена. В этом случае, добавьте в его название слово «WIP» (work in progress). Название можно будет изменить позже. Это полезно для совместного просмотра и обсуждения изменений, а также для запуска всех имеющихся тестов. Введите краткое описание изменений - впоследствии, оно будет использовано для релизных changelog.
|
||||
|
||||
Тесты будут запущены, как только сотрудники Яндекса поставят для pull request тег «Can be tested». Результаты первых проверок (стиль кода) появятся уже через несколько минут. Результаты сборки появятся примерно через пол часа. Результаты основного набора тестов будут доступны в пределах часа.
|
||||
Тесты будут запущены, как только сотрудники ClickHouse поставят для pull request тег «Can be tested». Результаты первых проверок (стиль кода) появятся уже через несколько минут. Результаты сборки появятся примерно через пол часа. Результаты основного набора тестов будут доступны в пределах часа.
|
||||
|
||||
Система подготовит сборки ClickHouse специально для вашего pull request. Для их получения, нажмите на ссылку «Details» у проверки «Clickhouse build check». Там вы сможете найти прямые ссылки на собранные .deb пакеты ClickHouse, которые, при желании, вы даже сможете установить на свои продакшен серверы (если не страшно).
|
||||
|
||||
|
@ -678,7 +678,7 @@ TTL d + INTERVAL 1 MONTH GROUP BY k1, k2 SET x = max(x), y = min(y);
|
||||
- `volume_name_N` — название тома. Названия томов должны быть уникальны.
|
||||
- `disk` — диск, находящийся внутри тома.
|
||||
- `max_data_part_size_bytes` — максимальный размер куска данных, который может находится на любом из дисков этого тома. Если в результате слияния размер куска ожидается больше, чем max_data_part_size_bytes, то этот кусок будет записан в следующий том. В основном эта функция позволяет хранить новые / мелкие куски на горячем (SSD) томе и перемещать их на холодный (HDD) том, когда они достигают большого размера. Не используйте этот параметр, если политика имеет только один том.
|
||||
- `move_factor` — доля доступного свободного места на томе, если места становится меньше, то данные начнут перемещение на следующий том, если он есть (по умолчанию 0.1).
|
||||
- `move_factor` — доля доступного свободного места на томе, если места становится меньше, то данные начнут перемещение на следующий том, если он есть (по умолчанию 0.1). Для перемещения куски сортируются по размеру от большего к меньшему (по убыванию) и выбираются куски, совокупный размер которых достаточен для соблюдения условия `move_factor`, если совокупный размер всех партов недостаточен, будут перемещены все парты.
|
||||
- `prefer_not_to_merge` — Отключает слияние кусков данных, хранящихся на данном томе. Если данная настройка включена, то слияние данных, хранящихся на данном томе, не допускается. Это позволяет контролировать работу ClickHouse с медленными дисками.
|
||||
|
||||
Примеры конфигураций:
|
||||
|
@ -259,7 +259,7 @@ ClickHouse的架构描述可以在此处查看:https://clickhouse.com/docs/en/
|
||||
|
||||
即使工作尚未完成,也可以创建拉取请求。在这种情况下,请在标题的开头加上«WIP»(正在进行中),以便后续更改。这对于协同审查和讨论更改以及运行所有可用测试用例很有用。提供有关变更的简短描述很重要,这将在后续用于生成重新发布变更日志。
|
||||
|
||||
Yandex成员一旦在您的拉取请求上贴上«可以测试»标签,就会开始测试。一些初始检查项(例如,代码类型)的结果会在几分钟内反馈。构建的检查结果将在半小时内完成。而主要的测试用例集结果将在一小时内报告给您。
|
||||
ClickHouse成员一旦在您的拉取请求上贴上«可以测试»标签,就会开始测试。一些初始检查项(例如,代码类型)的结果会在几分钟内反馈。构建的检查结果将在半小时内完成。而主要的测试用例集结果将在一小时内报告给您。
|
||||
|
||||
系统将分别为您的拉取请求准备ClickHouse二进制版本。若要检索这些构建信息,请在检查列表中单击« ClickHouse构建检查»旁边的«详细信息»链接。在这里,您会找到指向ClickHouse的.deb软件包的直接链接,此外,甚至可以将其部署在生产服务器上(如果您不担心)。
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include <base/phdr_cache.h>
|
||||
#include <base/ErrorHandlers.h>
|
||||
#include <base/getMemoryAmount.h>
|
||||
#include <base/getAvailableMemoryAmount.h>
|
||||
#include <base/errnoToString.h>
|
||||
#include <base/coverage.h>
|
||||
#include <base/getFQDNOrHostName.h>
|
||||
@ -45,6 +46,7 @@
|
||||
#include <Core/ServerUUID.h>
|
||||
#include <IO/HTTPCommon.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/IOThreadPool.h>
|
||||
#include <IO/UseSSL.h>
|
||||
#include <Interpreters/AsynchronousMetrics.h>
|
||||
@ -80,6 +82,7 @@
|
||||
#include <Common/SensitiveDataMasker.h>
|
||||
#include <Common/ThreadFuzzer.h>
|
||||
#include <Common/getHashOfLoadedBinary.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/Elf.h>
|
||||
#include <Server/MySQLHandlerFactory.h>
|
||||
#include <Server/PostgreSQLHandlerFactory.h>
|
||||
@ -505,6 +508,101 @@ void checkForUsersNotInMainConfig(
|
||||
}
|
||||
}
|
||||
|
||||
/// Unused in other builds
|
||||
#if defined(OS_LINUX)
|
||||
static String readString(const String & path)
|
||||
{
|
||||
ReadBufferFromFile in(path);
|
||||
String contents;
|
||||
readStringUntilEOF(contents, in);
|
||||
return contents;
|
||||
}
|
||||
|
||||
static int readNumber(const String & path)
|
||||
{
|
||||
ReadBufferFromFile in(path);
|
||||
int result;
|
||||
readText(result, in);
|
||||
return result;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
static void sanityChecks(Server * server)
|
||||
{
|
||||
std::string data_path = getCanonicalPath(server->config().getString("path", DBMS_DEFAULT_PATH));
|
||||
std::string logs_path = server->config().getString("logger.log", "");
|
||||
|
||||
#if defined(OS_LINUX)
|
||||
try
|
||||
{
|
||||
if (readString("/sys/devices/system/clocksource/clocksource0/current_clocksource").find("tsc") == std::string::npos)
|
||||
server->context()->addWarningMessage("Linux is not using fast TSC clock source. Performance can be degraded.");
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (readNumber("/proc/sys/vm/overcommit_memory") == 2)
|
||||
server->context()->addWarningMessage("Linux memory overcommit is disabled.");
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (readString("/sys/kernel/mm/transparent_hugepage/enabled").find("[always]") != std::string::npos)
|
||||
server->context()->addWarningMessage("Linux transparent hugepage are set to \"always\".");
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (readNumber("/proc/sys/kernel/pid_max") < 30000)
|
||||
server->context()->addWarningMessage("Linux max PID is too low.");
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (readNumber("/proc/sys/kernel/threads-max") < 30000)
|
||||
server->context()->addWarningMessage("Linux threads max count is too low.");
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
|
||||
std::string dev_id = getBlockDeviceId(data_path);
|
||||
if (getBlockDeviceType(dev_id) == BlockDeviceType::ROT && getBlockDeviceReadAheadBytes(dev_id) == 0)
|
||||
server->context()->addWarningMessage("Rotational disk with disabled readahead is in use. Performance can be degraded.");
|
||||
#endif
|
||||
|
||||
try
|
||||
{
|
||||
if (getAvailableMemoryAmount() < (2l << 30))
|
||||
server->context()->addWarningMessage("Available memory at server startup is too low (2GiB).");
|
||||
|
||||
if (!enoughSpaceInDirectory(data_path, 1ull << 30))
|
||||
server->context()->addWarningMessage("Available disk space at server startup is too low (1GiB).");
|
||||
|
||||
if (!logs_path.empty())
|
||||
{
|
||||
if (!enoughSpaceInDirectory(fs::path(logs_path).parent_path(), 1ull << 30))
|
||||
server->context()->addWarningMessage("Available disk space at server startup is too low (1GiB).");
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
int Server::main(const std::vector<std::string> & /*args*/)
|
||||
{
|
||||
Poco::Logger * log = &logger();
|
||||
@ -538,13 +636,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
global_context->addWarningMessage("Server was built in debug mode. It will work slowly.");
|
||||
#endif
|
||||
|
||||
if (ThreadFuzzer::instance().isEffective())
|
||||
global_context->addWarningMessage("ThreadFuzzer is enabled. Application will run slowly and unstable.");
|
||||
if (ThreadFuzzer::instance().isEffective())
|
||||
global_context->addWarningMessage("ThreadFuzzer is enabled. Application will run slowly and unstable.");
|
||||
|
||||
#if defined(SANITIZER)
|
||||
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");
|
||||
#endif
|
||||
|
||||
sanityChecks(this);
|
||||
|
||||
// Initialize global thread pool. Do it before we fetch configs from zookeeper
|
||||
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
|
||||
@ -766,6 +865,38 @@ if (ThreadFuzzer::instance().isEffective())
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to increase limit on number of threads.
|
||||
{
|
||||
rlimit rlim;
|
||||
if (getrlimit(RLIMIT_NPROC, &rlim))
|
||||
throw Poco::Exception("Cannot getrlimit");
|
||||
|
||||
if (rlim.rlim_cur == rlim.rlim_max)
|
||||
{
|
||||
LOG_DEBUG(log, "rlimit on number of threads is {}", rlim.rlim_cur);
|
||||
}
|
||||
else
|
||||
{
|
||||
rlim_t old = rlim.rlim_cur;
|
||||
rlim.rlim_cur = rlim.rlim_max;
|
||||
int rc = setrlimit(RLIMIT_NPROC, &rlim);
|
||||
if (rc != 0)
|
||||
{
|
||||
LOG_WARNING(log, "Cannot set max number of threads to {}. error: {}", rlim.rlim_cur, strerror(errno));
|
||||
rlim.rlim_cur = old;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Set max number of threads to {} (was {}).", rlim.rlim_cur, old);
|
||||
}
|
||||
}
|
||||
|
||||
if (rlim.rlim_cur < 30000)
|
||||
{
|
||||
global_context->addWarningMessage("Maximum number of threads is lower than 30000. There could be problems with handling a lot of simultaneous queries.");
|
||||
}
|
||||
}
|
||||
|
||||
static ServerErrorHandler error_handler;
|
||||
Poco::ErrorHandler::set(&error_handler);
|
||||
|
||||
|
@ -220,7 +220,7 @@ static void incrementProfileEventsBlock(Block & dst, const Block & src)
|
||||
}
|
||||
|
||||
|
||||
std::atomic_flag exit_on_signal = ATOMIC_FLAG_INIT;
|
||||
std::atomic_flag exit_on_signal;
|
||||
|
||||
class QueryInterruptHandler : private boost::noncopyable
|
||||
{
|
||||
|
@ -4,6 +4,8 @@
|
||||
#if defined(__linux__)
|
||||
# include <cstdio>
|
||||
# include <mntent.h>
|
||||
# include <sys/stat.h>
|
||||
# include <sys/sysmacros.h>
|
||||
#endif
|
||||
#include <cerrno>
|
||||
#include <Poco/Version.h>
|
||||
@ -13,6 +15,9 @@
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
#include <utime.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@ -24,6 +29,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int SYSTEM_ERROR;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int CANNOT_STAT;
|
||||
extern const int CANNOT_STATVFS;
|
||||
extern const int PATH_ACCESS_DENIED;
|
||||
extern const int CANNOT_CREATE_FILE;
|
||||
@ -57,6 +63,68 @@ std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path)
|
||||
return std::make_unique<TemporaryFile>(path);
|
||||
}
|
||||
|
||||
#if !defined(__linux__)
|
||||
[[noreturn]]
|
||||
#endif
|
||||
String getBlockDeviceId([[maybe_unused]] const String & path)
|
||||
{
|
||||
#if defined(__linux__)
|
||||
struct stat sb;
|
||||
if (lstat(path.c_str(), &sb))
|
||||
throwFromErrnoWithPath("Cannot lstat " + path, path, ErrorCodes::CANNOT_STAT);
|
||||
WriteBufferFromOwnString ss;
|
||||
ss << major(sb.st_dev) << ":" << minor(sb.st_dev);
|
||||
return ss.str();
|
||||
#else
|
||||
throw DB::Exception("The function getDeviceId is supported on Linux only", ErrorCodes::NOT_IMPLEMENTED);
|
||||
#endif
|
||||
}
|
||||
|
||||
#if !defined(__linux__)
|
||||
[[noreturn]]
|
||||
#endif
|
||||
BlockDeviceType getBlockDeviceType([[maybe_unused]] const String & device_id)
|
||||
{
|
||||
#if defined(__linux__)
|
||||
try
|
||||
{
|
||||
ReadBufferFromFile in("/sys/dev/block/" + device_id + "/queue/rotational");
|
||||
int rotational;
|
||||
readText(rotational, in);
|
||||
return rotational ? BlockDeviceType::ROT : BlockDeviceType::NONROT;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return BlockDeviceType::UNKNOWN;
|
||||
}
|
||||
#else
|
||||
throw DB::Exception("The function getDeviceType is supported on Linux only", ErrorCodes::NOT_IMPLEMENTED);
|
||||
#endif
|
||||
}
|
||||
|
||||
#if !defined(__linux__)
|
||||
[[noreturn]]
|
||||
#endif
|
||||
UInt64 getBlockDeviceReadAheadBytes([[maybe_unused]] const String & device_id)
|
||||
{
|
||||
#if defined(__linux__)
|
||||
try
|
||||
{
|
||||
ReadBufferFromFile in("/sys/dev/block/" + device_id + "/queue/read_ahead_kb");
|
||||
int read_ahead_kb;
|
||||
readText(read_ahead_kb, in);
|
||||
return read_ahead_kb * 1024;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return static_cast<UInt64>(-1);
|
||||
}
|
||||
#else
|
||||
throw DB::Exception("The function getDeviceType is supported on Linux only", ErrorCodes::NOT_IMPLEMENTED);
|
||||
#endif
|
||||
}
|
||||
|
||||
/// Returns name of filesystem mounted to mount_point
|
||||
std::filesystem::path getMountPoint(std::filesystem::path absolute_path)
|
||||
{
|
||||
if (absolute_path.is_relative())
|
||||
|
@ -18,6 +18,31 @@ using TemporaryFile = Poco::TemporaryFile;
|
||||
bool enoughSpaceInDirectory(const std::string & path, size_t data_size);
|
||||
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
|
||||
|
||||
// Determine what block device is responsible for specified path
|
||||
#if !defined(__linux__)
|
||||
[[noreturn]]
|
||||
#endif
|
||||
String getBlockDeviceId([[maybe_unused]] const String & path);
|
||||
|
||||
enum class BlockDeviceType
|
||||
{
|
||||
UNKNOWN = 0, // we were unable to determine device type
|
||||
NONROT = 1, // not a rotational device (SSD, NVME, etc)
|
||||
ROT = 2 // rotational device (HDD)
|
||||
};
|
||||
|
||||
// Try to determine block device type
|
||||
#if !defined(__linux__)
|
||||
[[noreturn]]
|
||||
#endif
|
||||
BlockDeviceType getBlockDeviceType([[maybe_unused]] const String & device_id);
|
||||
|
||||
// Get size of read-ahead in bytes for specified block device
|
||||
#if !defined(__linux__)
|
||||
[[noreturn]]
|
||||
#endif
|
||||
UInt64 getBlockDeviceReadAheadBytes([[maybe_unused]] const String & device_id);
|
||||
|
||||
/// Returns mount point of filesystem where absolute_path (must exist) is located
|
||||
std::filesystem::path getMountPoint(std::filesystem::path absolute_path);
|
||||
|
||||
|
@ -38,21 +38,7 @@ unsigned getCGroupLimitedCPUCores(unsigned default_cpu_count)
|
||||
quota_count = ceil(static_cast<float>(cgroup_quota) / static_cast<float>(cgroup_period));
|
||||
}
|
||||
|
||||
// Share number (typically a number relative to 1024) (2048 typically expresses 2 CPUs worth of processing)
|
||||
// -1 for no share setup
|
||||
int cgroup_share = read_from("/sys/fs/cgroup/cpu/cpu.shares", -1);
|
||||
// Convert 1024 to no shares setup
|
||||
if (cgroup_share == 1024)
|
||||
cgroup_share = -1;
|
||||
|
||||
# define PER_CPU_SHARES 1024
|
||||
unsigned share_count = default_cpu_count;
|
||||
if (cgroup_share > -1)
|
||||
{
|
||||
share_count = ceil(static_cast<float>(cgroup_share) / static_cast<float>(PER_CPU_SHARES));
|
||||
}
|
||||
|
||||
return std::min(default_cpu_count, std::min(share_count, quota_count));
|
||||
return std::min(default_cpu_count, quota_count);
|
||||
}
|
||||
#endif // OS_LINUX
|
||||
|
||||
@ -91,6 +77,7 @@ unsigned getNumberOfPhysicalCPUCores()
|
||||
cpu_count = std::thread::hardware_concurrency();
|
||||
|
||||
#if defined(OS_LINUX)
|
||||
/// TODO: add a setting for disabling that, similar to UseContainerSupport in java
|
||||
cpu_count = getCGroupLimitedCPUCores(cpu_count);
|
||||
#endif // OS_LINUX
|
||||
return cpu_count;
|
||||
|
@ -69,6 +69,7 @@ public:
|
||||
/// All below are parameters related to initial query.
|
||||
|
||||
Interface interface = Interface::TCP;
|
||||
bool is_secure = false;
|
||||
|
||||
/// For tcp
|
||||
String os_user;
|
||||
|
@ -86,6 +86,7 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes()
|
||||
{"initial_query_start_time", std::make_shared<DataTypeDateTime>()},
|
||||
{"initial_query_start_time_microseconds", std::make_shared<DataTypeDateTime64>(6)},
|
||||
{"interface", std::make_shared<DataTypeUInt8>()},
|
||||
{"is_secure", std::make_shared<DataTypeUInt8>()},
|
||||
{"os_user", std::make_shared<DataTypeString>()},
|
||||
{"client_hostname", std::make_shared<DataTypeString>()},
|
||||
{"client_name", std::make_shared<DataTypeString>()},
|
||||
@ -275,6 +276,7 @@ void QueryLogElement::appendClientInfo(const ClientInfo & client_info, MutableCo
|
||||
columns[i++]->insert(client_info.initial_query_start_time_microseconds);
|
||||
|
||||
columns[i++]->insert(UInt64(client_info.interface));
|
||||
columns[i++]->insert(static_cast<UInt64>(client_info.is_secure));
|
||||
|
||||
columns[i++]->insert(client_info.os_user);
|
||||
columns[i++]->insert(client_info.client_hostname);
|
||||
|
@ -56,6 +56,7 @@ NamesAndTypesList QueryThreadLogElement::getNamesAndTypes()
|
||||
{"initial_query_start_time", std::make_shared<DataTypeDateTime>()},
|
||||
{"initial_query_start_time_microseconds", std::make_shared<DataTypeDateTime64>(6)},
|
||||
{"interface", std::make_shared<DataTypeUInt8>()},
|
||||
{"is_secure", std::make_shared<DataTypeUInt8>()},
|
||||
{"os_user", std::make_shared<DataTypeString>()},
|
||||
{"client_hostname", std::make_shared<DataTypeString>()},
|
||||
{"client_name", std::make_shared<DataTypeString>()},
|
||||
|
@ -243,7 +243,7 @@ void Session::shutdownNamedSessions()
|
||||
NamedSessionsStorage::instance().shutdown();
|
||||
}
|
||||
|
||||
Session::Session(const ContextPtr & global_context_, ClientInfo::Interface interface_)
|
||||
Session::Session(const ContextPtr & global_context_, ClientInfo::Interface interface_, bool is_secure)
|
||||
: auth_id(UUIDHelpers::generateV4()),
|
||||
global_context(global_context_),
|
||||
interface(interface_),
|
||||
@ -251,6 +251,7 @@ Session::Session(const ContextPtr & global_context_, ClientInfo::Interface inter
|
||||
{
|
||||
prepared_client_info.emplace();
|
||||
prepared_client_info->interface = interface_;
|
||||
prepared_client_info->is_secure = is_secure;
|
||||
}
|
||||
|
||||
Session::~Session()
|
||||
|
@ -32,7 +32,7 @@ public:
|
||||
/// Stops using named sessions. The method must be called at the server shutdown.
|
||||
static void shutdownNamedSessions();
|
||||
|
||||
Session(const ContextPtr & global_context_, ClientInfo::Interface interface_);
|
||||
Session(const ContextPtr & global_context_, ClientInfo::Interface interface_, bool is_secure = false);
|
||||
~Session();
|
||||
|
||||
Session(const Session &&) = delete;
|
||||
|
@ -328,14 +328,20 @@ bool ParserLeftAssociativeBinaryOperatorList::parseImpl(Pos & pos, ASTPtr & node
|
||||
|
||||
ASTPtr elem;
|
||||
SubqueryFunctionType subquery_function_type = SubqueryFunctionType::NONE;
|
||||
if (allow_any_all_operators && ParserKeyword("ANY").ignore(pos, expected))
|
||||
subquery_function_type = SubqueryFunctionType::ANY;
|
||||
else if (allow_any_all_operators && ParserKeyword("ALL").ignore(pos, expected))
|
||||
subquery_function_type = SubqueryFunctionType::ALL;
|
||||
else if (!(remaining_elem_parser ? remaining_elem_parser : first_elem_parser)->parse(pos, elem, expected))
|
||||
return false;
|
||||
|
||||
if (comparison_expression)
|
||||
{
|
||||
if (ParserKeyword("ANY").ignore(pos, expected))
|
||||
subquery_function_type = SubqueryFunctionType::ANY;
|
||||
else if (ParserKeyword("ALL").ignore(pos, expected))
|
||||
subquery_function_type = SubqueryFunctionType::ALL;
|
||||
}
|
||||
|
||||
if (subquery_function_type != SubqueryFunctionType::NONE && !ParserSubquery().parse(pos, elem, expected))
|
||||
subquery_function_type = SubqueryFunctionType::NONE;
|
||||
|
||||
if (subquery_function_type == SubqueryFunctionType::NONE
|
||||
&& !(remaining_elem_parser ? remaining_elem_parser : first_elem_parser)->parse(pos, elem, expected))
|
||||
return false;
|
||||
|
||||
/// the first argument of the function is the previous element, the second is the next one
|
||||
@ -346,7 +352,7 @@ bool ParserLeftAssociativeBinaryOperatorList::parseImpl(Pos & pos, ASTPtr & node
|
||||
exp_list->children.push_back(node);
|
||||
exp_list->children.push_back(elem);
|
||||
|
||||
if (allow_any_all_operators && subquery_function_type != SubqueryFunctionType::NONE && !modifyAST(function, subquery_function_type))
|
||||
if (comparison_expression && subquery_function_type != SubqueryFunctionType::NONE && !modifyAST(function, subquery_function_type))
|
||||
return false;
|
||||
|
||||
/** special exception for the access operator to the element of the array `x[y]`, which
|
||||
|
@ -122,7 +122,7 @@ private:
|
||||
ParserPtr first_elem_parser;
|
||||
ParserPtr remaining_elem_parser;
|
||||
/// =, !=, <, > ALL (subquery) / ANY (subquery)
|
||||
bool allow_any_all_operators = false;
|
||||
bool comparison_expression = false;
|
||||
|
||||
public:
|
||||
/** `operators_` - allowed operators and their corresponding functions
|
||||
@ -133,9 +133,9 @@ public:
|
||||
}
|
||||
|
||||
ParserLeftAssociativeBinaryOperatorList(Operators_t operators_,
|
||||
Operators_t overlapping_operators_to_skip_, ParserPtr && first_elem_parser_, bool allow_any_all_operators_ = false)
|
||||
Operators_t overlapping_operators_to_skip_, ParserPtr && first_elem_parser_, bool comparison_expression_ = false)
|
||||
: operators(operators_), overlapping_operators_to_skip(overlapping_operators_to_skip_),
|
||||
first_elem_parser(std::move(first_elem_parser_)), allow_any_all_operators(allow_any_all_operators_)
|
||||
first_elem_parser(std::move(first_elem_parser_)), comparison_expression(comparison_expression_)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -922,7 +922,7 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
|
||||
setThreadName("HTTPHandler");
|
||||
ThreadStatus thread_status;
|
||||
|
||||
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::HTTP);
|
||||
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::HTTP, request.isSecure());
|
||||
SCOPE_EXIT({ session.reset(); });
|
||||
std::optional<CurrentThread::QueryScope> query_scope;
|
||||
|
||||
|
@ -171,7 +171,7 @@ struct SocketInterruptablePollWrapper
|
||||
|
||||
if (rc >= 1 && poll_buf[0].revents & POLLIN)
|
||||
socket_ready = true;
|
||||
if (rc >= 1 && poll_buf[1].revents & POLLIN)
|
||||
if (rc >= 2 && poll_buf[1].revents & POLLIN)
|
||||
fd_ready = true;
|
||||
#endif
|
||||
}
|
||||
@ -415,6 +415,13 @@ void KeeperTCPHandler::runImpl()
|
||||
log_long_operation("Polling socket");
|
||||
if (result.has_requests && !close_received)
|
||||
{
|
||||
if (in->eof())
|
||||
{
|
||||
LOG_DEBUG(log, "Client closed connection, session id #{}", session_id);
|
||||
keeper_dispatcher->finishSession(session_id);
|
||||
break;
|
||||
}
|
||||
|
||||
auto [received_op, received_xid] = receiveRequest();
|
||||
packageReceived();
|
||||
log_long_operation("Receiving request");
|
||||
|
@ -110,7 +110,7 @@ void TCPHandler::runImpl()
|
||||
setThreadName("TCPHandler");
|
||||
ThreadStatus thread_status;
|
||||
|
||||
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::TCP);
|
||||
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::TCP, socket().secure());
|
||||
extractConnectionSettingsFromContext(server.context());
|
||||
|
||||
socket().setReceiveTimeout(receive_timeout);
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Poco/URI.h>
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
#include <re2/re2.h>
|
||||
#include <filesystem>
|
||||
|
||||
#if USE_HDFS
|
||||
#include <Common/ShellCommand.h>
|
||||
@ -25,14 +26,14 @@ const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";
|
||||
const String HDFS_URL_REGEXP = "^hdfs://[^/]*/.*";
|
||||
|
||||
void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_path, bool isUser)
|
||||
const String & prefix, bool isUser)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
|
||||
config.keys(config_path, keys);
|
||||
config.keys(prefix, keys);
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
const String key_path = config_path + "." + key;
|
||||
const String key_path = prefix + "." + key;
|
||||
|
||||
String key_name;
|
||||
if (key == "hadoop_kerberos_keytab")
|
||||
@ -122,9 +123,17 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::A
|
||||
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
// Shall set env LIBHDFS3_CONF *before* HDFSBuilderWrapper construction.
|
||||
const String & libhdfs3_conf = config.getString(HDFSBuilderWrapper::CONFIG_PREFIX + ".libhdfs3_conf", "");
|
||||
String libhdfs3_conf = config.getString(HDFSBuilderWrapper::CONFIG_PREFIX + ".libhdfs3_conf", "");
|
||||
if (!libhdfs3_conf.empty())
|
||||
{
|
||||
if (std::filesystem::path{libhdfs3_conf}.is_relative() && !std::filesystem::exists(libhdfs3_conf))
|
||||
{
|
||||
const String config_path = config.getString("config-file", "config.xml");
|
||||
const auto config_dir = std::filesystem::path{config_path}.remove_filename();
|
||||
if (std::filesystem::exists(config_dir / libhdfs3_conf))
|
||||
libhdfs3_conf = std::filesystem::absolute(config_dir / libhdfs3_conf);
|
||||
}
|
||||
|
||||
setenv("LIBHDFS3_CONF", libhdfs3_conf.c_str(), 1);
|
||||
}
|
||||
HDFSBuilderWrapper builder;
|
||||
|
@ -67,7 +67,7 @@ public:
|
||||
hdfsBuilder * get() { return hdfs_builder; }
|
||||
|
||||
private:
|
||||
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false);
|
||||
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & prefix, bool isUser = false);
|
||||
|
||||
String getKinitCmd();
|
||||
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include <Storages/HDFS/ReadBufferFromHDFS.h>
|
||||
#include <Storages/HDFS/WriteBufferFromHDFS.h>
|
||||
#include <Storages/PartitionedSink.h>
|
||||
#include <Storages/getVirtualsForStorage.h>
|
||||
|
||||
#include <Formats/ReadSchemaUtils.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
@ -164,6 +165,13 @@ StorageHDFS::StorageHDFS(
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
auto default_virtuals = NamesAndTypesList{
|
||||
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
|
||||
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
|
||||
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
|
||||
}
|
||||
|
||||
ColumnsDescription StorageHDFS::getTableStructureFromData(
|
||||
@ -273,36 +281,6 @@ private:
|
||||
Strings::iterator uris_iter;
|
||||
};
|
||||
|
||||
Block HDFSSource::getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
|
||||
{
|
||||
auto header = metadata_snapshot->getSampleBlock();
|
||||
/// Note: AddingDefaultsBlockInputStream doesn't change header.
|
||||
if (need_path_column)
|
||||
header.insert(
|
||||
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
|
||||
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
|
||||
"_path"});
|
||||
if (need_file_column)
|
||||
header.insert(
|
||||
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
|
||||
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
|
||||
"_file"});
|
||||
return header;
|
||||
}
|
||||
|
||||
Block HDFSSource::getBlockForSource(
|
||||
const StorageHDFSPtr & storage,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
const ColumnsDescription & columns_description,
|
||||
bool need_path_column,
|
||||
bool need_file_column)
|
||||
{
|
||||
if (storage->isColumnOriented())
|
||||
return storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
|
||||
else
|
||||
return getHeader(storage_snapshot->metadata, need_path_column, need_file_column);
|
||||
}
|
||||
|
||||
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(ContextPtr context_, const String & uri)
|
||||
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(context_, uri)) {}
|
||||
|
||||
@ -321,22 +299,28 @@ String HDFSSource::URISIterator::next()
|
||||
return pimpl->next();
|
||||
}
|
||||
|
||||
Block HDFSSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
|
||||
{
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
|
||||
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
HDFSSource::HDFSSource(
|
||||
StorageHDFSPtr storage_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const Block & block_for_format_,
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
ContextPtr context_,
|
||||
UInt64 max_block_size_,
|
||||
bool need_path_column_,
|
||||
bool need_file_column_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
ColumnsDescription columns_description_)
|
||||
: SourceWithProgress(getBlockForSource(storage_, storage_snapshot_, columns_description_, need_path_column_, need_file_column_))
|
||||
: SourceWithProgress(getHeader(block_for_format_, requested_virtual_columns_))
|
||||
, WithContext(context_)
|
||||
, storage(std::move(storage_))
|
||||
, storage_snapshot(storage_snapshot_)
|
||||
, block_for_format(block_for_format_)
|
||||
, requested_virtual_columns(requested_virtual_columns_)
|
||||
, max_block_size(max_block_size_)
|
||||
, need_path_column(need_path_column_)
|
||||
, need_file_column(need_file_column_)
|
||||
, file_iterator(file_iterator_)
|
||||
, columns_description(std::move(columns_description_))
|
||||
{
|
||||
@ -361,14 +345,7 @@ bool HDFSSource::initialize()
|
||||
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
|
||||
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression);
|
||||
|
||||
auto get_block_for_format = [&]() -> Block
|
||||
{
|
||||
if (storage->isColumnOriented())
|
||||
return storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
|
||||
return storage_snapshot->metadata->getSampleBlock();
|
||||
};
|
||||
|
||||
auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, get_block_for_format(), max_block_size);
|
||||
auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size);
|
||||
|
||||
QueryPipelineBuilder builder;
|
||||
builder.init(Pipe(input_format));
|
||||
@ -402,20 +379,21 @@ Chunk HDFSSource::generate()
|
||||
Columns columns = chunk.getColumns();
|
||||
UInt64 num_rows = chunk.getNumRows();
|
||||
|
||||
/// Enrich with virtual columns.
|
||||
if (need_path_column)
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
{
|
||||
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
|
||||
columns.push_back(column->convertToFullColumnIfConst());
|
||||
}
|
||||
if (virtual_column.name == "_path")
|
||||
{
|
||||
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
|
||||
columns.push_back(column->convertToFullColumnIfConst());
|
||||
}
|
||||
else if (virtual_column.name == "_file")
|
||||
{
|
||||
size_t last_slash_pos = current_path.find_last_of('/');
|
||||
auto file_name = current_path.substr(last_slash_pos + 1);
|
||||
|
||||
if (need_file_column)
|
||||
{
|
||||
size_t last_slash_pos = current_path.find_last_of('/');
|
||||
auto file_name = current_path.substr(last_slash_pos + 1);
|
||||
|
||||
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
|
||||
columns.push_back(column->convertToFullColumnIfConst());
|
||||
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
|
||||
columns.push_back(column->convertToFullColumnIfConst());
|
||||
}
|
||||
}
|
||||
|
||||
return Chunk(std::move(columns), num_rows);
|
||||
@ -526,17 +504,6 @@ Pipe StorageHDFS::read(
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
bool need_path_column = false;
|
||||
bool need_file_column = false;
|
||||
|
||||
for (const auto & column : column_names)
|
||||
{
|
||||
if (column == "_path")
|
||||
need_path_column = true;
|
||||
if (column == "_file")
|
||||
need_file_column = true;
|
||||
}
|
||||
|
||||
std::shared_ptr<HDFSSource::IteratorWrapper> iterator_wrapper{nullptr};
|
||||
if (distributed_processing)
|
||||
{
|
||||
@ -563,27 +530,51 @@ Pipe StorageHDFS::read(
|
||||
});
|
||||
}
|
||||
|
||||
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
|
||||
for (const auto & virtual_column : getVirtuals())
|
||||
{
|
||||
if (column_names_set.contains(virtual_column.name))
|
||||
requested_virtual_columns.push_back(virtual_column);
|
||||
}
|
||||
|
||||
ColumnsDescription columns_description;
|
||||
Block block_for_format;
|
||||
if (isColumnOriented())
|
||||
{
|
||||
auto fetch_columns = column_names;
|
||||
const auto & virtuals = getVirtuals();
|
||||
std::erase_if(
|
||||
fetch_columns,
|
||||
[&](const String & col)
|
||||
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
|
||||
|
||||
if (fetch_columns.empty())
|
||||
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()));
|
||||
|
||||
columns_description = ColumnsDescription{
|
||||
storage_snapshot->getSampleBlockForColumns(fetch_columns).getNamesAndTypesList()};
|
||||
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
|
||||
}
|
||||
else
|
||||
{
|
||||
columns_description = storage_snapshot->metadata->getColumns();
|
||||
block_for_format = storage_snapshot->metadata->getSampleBlock();
|
||||
}
|
||||
|
||||
Pipes pipes;
|
||||
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
const auto get_columns_for_format = [&]() -> ColumnsDescription
|
||||
{
|
||||
if (isColumnOriented())
|
||||
return ColumnsDescription{storage_snapshot->getSampleBlockForColumns(column_names).getNamesAndTypesList()};
|
||||
else
|
||||
return storage_snapshot->metadata->getColumns();
|
||||
};
|
||||
|
||||
pipes.emplace_back(std::make_shared<HDFSSource>(
|
||||
this_ptr,
|
||||
storage_snapshot,
|
||||
block_for_format,
|
||||
requested_virtual_columns,
|
||||
context_,
|
||||
max_block_size,
|
||||
need_path_column,
|
||||
need_file_column,
|
||||
iterator_wrapper,
|
||||
get_columns_for_format()));
|
||||
columns_description));
|
||||
}
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
}
|
||||
@ -715,9 +706,7 @@ void registerStorageHDFS(StorageFactory & factory)
|
||||
|
||||
NamesAndTypesList StorageHDFS::getVirtuals() const
|
||||
{
|
||||
return NamesAndTypesList{
|
||||
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
return virtual_columns;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -76,6 +76,7 @@ private:
|
||||
const bool distributed_processing;
|
||||
ASTPtr partition_by;
|
||||
bool is_path_with_globs;
|
||||
NamesAndTypesList virtual_columns;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("StorageHDFS");
|
||||
};
|
||||
@ -110,25 +111,14 @@ public:
|
||||
using IteratorWrapper = std::function<String()>;
|
||||
using StorageHDFSPtr = std::shared_ptr<StorageHDFS>;
|
||||
|
||||
static Block getHeader(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
bool need_path_column,
|
||||
bool need_file_column);
|
||||
|
||||
static Block getBlockForSource(
|
||||
const StorageHDFSPtr & storage,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ColumnsDescription & columns_description,
|
||||
bool need_path_column,
|
||||
bool need_file_column);
|
||||
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
|
||||
|
||||
HDFSSource(
|
||||
StorageHDFSPtr storage_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const Block & block_for_format_,
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
ContextPtr context_,
|
||||
UInt64 max_block_size_,
|
||||
bool need_path_column_,
|
||||
bool need_file_column_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
ColumnsDescription columns_description_);
|
||||
|
||||
@ -140,7 +130,8 @@ public:
|
||||
|
||||
private:
|
||||
StorageHDFSPtr storage;
|
||||
StorageSnapshotPtr storage_snapshot;
|
||||
Block block_for_format;
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
UInt64 max_block_size;
|
||||
bool need_path_column;
|
||||
bool need_file_column;
|
||||
|
@ -256,20 +256,19 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
{
|
||||
String msg;
|
||||
if (is_extended_storage_def)
|
||||
msg += "With extended storage definition syntax storage " + args.engine_name + " requires ";
|
||||
msg += fmt::format("With extended storage definition syntax storage {} requires ", args.engine_name);
|
||||
else
|
||||
msg += "Storage " + args.engine_name + " requires ";
|
||||
msg += fmt::format("ORDER BY or PRIMARY KEY clause is missing. "
|
||||
"Consider using extended storage definition syntax with ORDER BY or PRIMARY KEY clause. "
|
||||
"With deprecated old syntax (highly not recommended) storage {} requires ", args.engine_name);
|
||||
|
||||
if (max_num_params)
|
||||
{
|
||||
if (min_num_params == max_num_params)
|
||||
msg += toString(min_num_params) + " parameters: ";
|
||||
else
|
||||
msg += toString(min_num_params) + " to " + toString(max_num_params) + " parameters: ";
|
||||
msg += needed_params;
|
||||
}
|
||||
else
|
||||
if (max_num_params == 0)
|
||||
msg += "no parameters";
|
||||
if (min_num_params == max_num_params)
|
||||
msg += fmt::format("{} parameters: {}", min_num_params, needed_params);
|
||||
else
|
||||
msg += fmt::format("{} to {} parameters: {}", min_num_params, max_num_params, needed_params);
|
||||
|
||||
|
||||
msg += getMergeTreeVerboseHelp(is_extended_storage_def);
|
||||
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <Storages/StorageSnapshot.h>
|
||||
#include <Storages/PartitionedSink.h>
|
||||
#include <Storages/getVirtualsForStorage.h>
|
||||
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
@ -210,25 +211,16 @@ String StorageS3Source::KeysIterator::next()
|
||||
return pimpl->next();
|
||||
}
|
||||
|
||||
Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool with_file_column)
|
||||
Block StorageS3Source::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
|
||||
{
|
||||
if (with_path_column)
|
||||
sample_block.insert(
|
||||
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
|
||||
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
|
||||
"_path"});
|
||||
if (with_file_column)
|
||||
sample_block.insert(
|
||||
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
|
||||
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
|
||||
"_file"});
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
|
||||
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
StorageS3Source::StorageS3Source(
|
||||
bool need_path,
|
||||
bool need_file,
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
const String & format_,
|
||||
String name_,
|
||||
const Block & sample_block_,
|
||||
@ -242,7 +234,7 @@ StorageS3Source::StorageS3Source(
|
||||
const String & bucket_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
const size_t download_thread_num_)
|
||||
: SourceWithProgress(getHeader(sample_block_, need_path, need_file))
|
||||
: SourceWithProgress(getHeader(sample_block_, requested_virtual_columns_))
|
||||
, WithContext(context_)
|
||||
, name(std::move(name_))
|
||||
, bucket(bucket_)
|
||||
@ -254,8 +246,7 @@ StorageS3Source::StorageS3Source(
|
||||
, client(client_)
|
||||
, sample_block(sample_block_)
|
||||
, format_settings(format_settings_)
|
||||
, with_file_column(need_file)
|
||||
, with_path_column(need_path)
|
||||
, requested_virtual_columns(requested_virtual_columns_)
|
||||
, file_iterator(file_iterator_)
|
||||
, download_thread_num(download_thread_num_)
|
||||
{
|
||||
@ -344,16 +335,18 @@ Chunk StorageS3Source::generate()
|
||||
{
|
||||
UInt64 num_rows = chunk.getNumRows();
|
||||
|
||||
if (with_path_column)
|
||||
chunk.addColumn(DataTypeLowCardinality{std::make_shared<DataTypeString>()}
|
||||
.createColumnConst(num_rows, file_path)
|
||||
->convertToFullColumnIfConst());
|
||||
if (with_file_column)
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
{
|
||||
size_t last_slash_pos = file_path.find_last_of('/');
|
||||
chunk.addColumn(DataTypeLowCardinality{std::make_shared<DataTypeString>()}
|
||||
.createColumnConst(num_rows, file_path.substr(last_slash_pos + 1))
|
||||
->convertToFullColumnIfConst());
|
||||
if (virtual_column.name == "_path")
|
||||
{
|
||||
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
|
||||
}
|
||||
else if (virtual_column.name == "_file")
|
||||
{
|
||||
size_t last_slash_pos = file_path.find_last_of('/');
|
||||
auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
|
||||
chunk.addColumn(column->convertToFullColumnIfConst());
|
||||
}
|
||||
}
|
||||
|
||||
return chunk;
|
||||
@ -627,6 +620,13 @@ StorageS3::StorageS3(
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
auto default_virtuals = NamesAndTypesList{
|
||||
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
|
||||
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
|
||||
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context)
|
||||
@ -674,14 +674,14 @@ Pipe StorageS3::read(
|
||||
updateClientAndAuthSettings(local_context, client_auth);
|
||||
|
||||
Pipes pipes;
|
||||
bool need_path_column = false;
|
||||
bool need_file_column = false;
|
||||
for (const auto & column : column_names)
|
||||
|
||||
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
|
||||
for (const auto & virtual_column : getVirtuals())
|
||||
{
|
||||
if (column == "_path")
|
||||
need_path_column = true;
|
||||
if (column == "_file")
|
||||
need_file_column = true;
|
||||
if (column_names_set.contains(virtual_column.name))
|
||||
requested_virtual_columns.push_back(virtual_column);
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, local_context);
|
||||
@ -690,8 +690,18 @@ Pipe StorageS3::read(
|
||||
Block block_for_format;
|
||||
if (isColumnOriented())
|
||||
{
|
||||
auto fetch_columns = column_names;
|
||||
const auto & virtuals = getVirtuals();
|
||||
std::erase_if(
|
||||
fetch_columns,
|
||||
[&](const String & col)
|
||||
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
|
||||
|
||||
if (fetch_columns.empty())
|
||||
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()));
|
||||
|
||||
columns_description = ColumnsDescription{
|
||||
storage_snapshot->getSampleBlockForColumns(column_names).getNamesAndTypesList()};
|
||||
storage_snapshot->getSampleBlockForColumns(fetch_columns).getNamesAndTypesList()};
|
||||
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
|
||||
}
|
||||
else
|
||||
@ -704,8 +714,7 @@ Pipe StorageS3::read(
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<StorageS3Source>(
|
||||
need_path_column,
|
||||
need_file_column,
|
||||
requested_virtual_columns,
|
||||
format_name,
|
||||
getName(),
|
||||
block_for_format,
|
||||
@ -882,6 +891,8 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
|
||||
configuration.access_key_id = arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else if (arg_name == "secret_access_key")
|
||||
configuration.secret_access_key = arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else if (arg_name == "filename")
|
||||
configuration.url = std::filesystem::path(configuration.url) / arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Unknown key-value argument `{}` for StorageS3, expected: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
|
||||
@ -1081,9 +1092,7 @@ void registerStorageCOS(StorageFactory & factory)
|
||||
|
||||
NamesAndTypesList StorageS3::getVirtuals() const
|
||||
{
|
||||
return NamesAndTypesList{
|
||||
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
return virtual_columns;
|
||||
}
|
||||
|
||||
bool StorageS3::supportsPartitionBy() const
|
||||
|
@ -58,11 +58,10 @@ public:
|
||||
|
||||
using IteratorWrapper = std::function<String()>;
|
||||
|
||||
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
|
||||
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
|
||||
|
||||
StorageS3Source(
|
||||
bool need_path,
|
||||
bool need_file,
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
const String & format,
|
||||
String name_,
|
||||
const Block & sample_block,
|
||||
@ -102,8 +101,7 @@ private:
|
||||
std::unique_ptr<PullingPipelineExecutor> reader;
|
||||
/// onCancel and generate can be called concurrently
|
||||
std::mutex reader_mutex;
|
||||
bool with_file_column = false;
|
||||
bool with_path_column = false;
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
std::shared_ptr<IteratorWrapper> file_iterator;
|
||||
size_t download_thread_num = 1;
|
||||
|
||||
@ -196,6 +194,7 @@ private:
|
||||
|
||||
ClientAuthentication client_auth;
|
||||
std::vector<String> keys;
|
||||
NamesAndTypesList virtual_columns;
|
||||
|
||||
String format_name;
|
||||
UInt64 max_single_read_retries;
|
||||
|
22
src/Storages/getVirtualsForStorage.cpp
Normal file
22
src/Storages/getVirtualsForStorage.cpp
Normal file
@ -0,0 +1,22 @@
|
||||
#include "getVirtualsForStorage.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
NamesAndTypesList getVirtualsForStorage(const NamesAndTypesList & storage_columns_, const NamesAndTypesList & default_virtuals_)
|
||||
{
|
||||
auto default_virtuals = default_virtuals_;
|
||||
auto storage_columns = storage_columns_;
|
||||
default_virtuals.sort();
|
||||
storage_columns.sort();
|
||||
|
||||
NamesAndTypesList result_virtuals;
|
||||
std::set_difference(
|
||||
default_virtuals.begin(), default_virtuals.end(), storage_columns.begin(), storage_columns.end(),
|
||||
std::back_inserter(result_virtuals),
|
||||
[](const NameAndTypePair & lhs, const NameAndTypePair & rhs){ return lhs.name < rhs.name; });
|
||||
|
||||
return result_virtuals;
|
||||
}
|
||||
|
||||
}
|
9
src/Storages/getVirtualsForStorage.h
Normal file
9
src/Storages/getVirtualsForStorage.h
Normal file
@ -0,0 +1,9 @@
|
||||
#pragma once
|
||||
#include <Core/NamesAndTypes.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
NamesAndTypesList getVirtualsForStorage(const NamesAndTypesList & storage_columns_, const NamesAndTypesList & default_virtuals_);
|
||||
|
||||
}
|
@ -12,6 +12,7 @@
|
||||
#include <Storages/StorageS3.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include "registerTableFunctions.h"
|
||||
#include <filesystem>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -37,6 +38,8 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
|
||||
s3_configuration.access_key_id = arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else if (arg_name == "secret_access_key")
|
||||
s3_configuration.secret_access_key = arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else if (arg_name == "filename")
|
||||
s3_configuration.url = std::filesystem::path(s3_configuration.url) / arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
|
||||
}
|
||||
|
@ -22,5 +22,11 @@
|
||||
</header>
|
||||
</headers>
|
||||
</url_with_headers>
|
||||
<s3_conn>
|
||||
<url>http://localhost:11111/test/</url>
|
||||
<access_key_id>test</access_key_id>
|
||||
<secret_access_key>testtest</secret_access_key>
|
||||
<structure>auto</structure>
|
||||
</s3_conn>
|
||||
</named_collections>
|
||||
</clickhouse>
|
||||
|
@ -554,6 +554,26 @@ def test_insert_select_schema_inference(started_cluster):
|
||||
assert int(result) == 1
|
||||
|
||||
|
||||
def test_virtual_columns_2(started_cluster):
|
||||
hdfs_api = started_cluster.hdfs_api
|
||||
|
||||
table_function = (
|
||||
f"hdfs('hdfs://hdfs1:9000/parquet_2', 'Parquet', 'a Int32, b String')"
|
||||
)
|
||||
node1.query(f"insert into table function {table_function} SELECT 1, 'kek'")
|
||||
|
||||
result = node1.query(f"SELECT _path FROM {table_function}")
|
||||
assert result.strip() == "hdfs://hdfs1:9000/parquet_2"
|
||||
|
||||
table_function = (
|
||||
f"hdfs('hdfs://hdfs1:9000/parquet_3', 'Parquet', 'a Int32, _path String')"
|
||||
)
|
||||
node1.query(f"insert into table function {table_function} SELECT 1, 'kek'")
|
||||
|
||||
result = node1.query(f"SELECT _path FROM {table_function}")
|
||||
assert result.strip() == "kek"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
@ -49,3 +49,7 @@ select 11 > all (select 11 from numbers(10));
|
||||
0
|
||||
select 11 >= all (select 11 from numbers(10));
|
||||
1
|
||||
select sum(number) = any(number) from numbers(1) group by number;
|
||||
1
|
||||
select 1 == any (1);
|
||||
1
|
||||
|
@ -24,3 +24,5 @@ select 11 <= all (select number from numbers(11));
|
||||
select 11 < all (select 11 from numbers(10));
|
||||
select 11 > all (select 11 from numbers(10));
|
||||
select 11 >= all (select 11 from numbers(10));
|
||||
select sum(number) = any(number) from numbers(1) group by number;
|
||||
select 1 == any (1);
|
||||
|
15
tests/queries/0_stateless/02245_s3_virtual_columns.reference
Normal file
15
tests/queries/0_stateless/02245_s3_virtual_columns.reference
Normal file
@ -0,0 +1,15 @@
|
||||
-- { echo }
|
||||
drop table if exists test_02245;
|
||||
create table test_02245 (a UInt64) engine = S3(s3_conn, filename='test_02245', format=Parquet);
|
||||
insert into test_02245 select 1 settings s3_truncate_on_insert=1;
|
||||
select * from test_02245;
|
||||
1
|
||||
select _path from test_02245;
|
||||
test/test_02245
|
||||
drop table if exists test_02245_2;
|
||||
create table test_02245_2 (a UInt64, _path Int32) engine = S3(s3_conn, filename='test_02245_2', format=Parquet);
|
||||
insert into test_02245_2 select 1, 2 settings s3_truncate_on_insert=1;
|
||||
select * from test_02245_2;
|
||||
1 2
|
||||
select _path from test_02245_2;
|
||||
2
|
15
tests/queries/0_stateless/02245_s3_virtual_columns.sql
Normal file
15
tests/queries/0_stateless/02245_s3_virtual_columns.sql
Normal file
@ -0,0 +1,15 @@
|
||||
-- Tags: no-fasttest
|
||||
-- Tag no-fasttest: Depends on AWS
|
||||
|
||||
-- { echo }
|
||||
drop table if exists test_02245;
|
||||
create table test_02245 (a UInt64) engine = S3(s3_conn, filename='test_02245', format=Parquet);
|
||||
insert into test_02245 select 1 settings s3_truncate_on_insert=1;
|
||||
select * from test_02245;
|
||||
select _path from test_02245;
|
||||
|
||||
drop table if exists test_02245_2;
|
||||
create table test_02245_2 (a UInt64, _path Int32) engine = S3(s3_conn, filename='test_02245_2', format=Parquet);
|
||||
insert into test_02245_2 select 1, 2 settings s3_truncate_on_insert=1;
|
||||
select * from test_02245_2;
|
||||
select _path from test_02245_2;
|
@ -0,0 +1,4 @@
|
||||
1 0
|
||||
1 1
|
||||
2 0
|
||||
2 1
|
22
tests/queries/0_stateless/02246_is_secure_query_log.sh
Executable file
22
tests/queries/0_stateless/02246_is_secure_query_log.sh
Executable file
@ -0,0 +1,22 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} --log_queries=1 --query_id "2246_${CLICKHOUSE_DATABASE}_client_nonsecure" -q "select 1 Format Null"
|
||||
${CLICKHOUSE_CLIENT} -q "system flush logs"
|
||||
${CLICKHOUSE_CLIENT} -q "select interface, is_secure from system.query_log where query_id = '2246_${CLICKHOUSE_DATABASE}_client_nonsecure' and type = 'QueryFinish' and current_database = currentDatabase()"
|
||||
|
||||
${CLICKHOUSE_CLIENT_SECURE} --log_queries=1 --query_id "2246_${CLICKHOUSE_DATABASE}_client_secure" -q "select 1 Format Null"
|
||||
${CLICKHOUSE_CLIENT} -q "system flush logs"
|
||||
${CLICKHOUSE_CLIENT} -q "select interface, is_secure from system.query_log where query_id = '2246_${CLICKHOUSE_DATABASE}_client_secure' and type = 'QueryFinish' and current_database = currentDatabase()"
|
||||
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&log_queries=1&query_id=2246_${CLICKHOUSE_DATABASE}_http_nonsecure" -d "select 1 Format Null"
|
||||
${CLICKHOUSE_CLIENT} -q "system flush logs"
|
||||
${CLICKHOUSE_CLIENT} -q "select interface, is_secure from system.query_log where query_id = '2246_${CLICKHOUSE_DATABASE}_http_nonsecure' and type = 'QueryFinish' and current_database = currentDatabase()"
|
||||
|
||||
${CLICKHOUSE_CURL} -sSk "${CLICKHOUSE_URL_HTTPS}&log_queries=1&query_id=2246_${CLICKHOUSE_DATABASE}_http_secure" -d "select 1 Format Null"
|
||||
${CLICKHOUSE_CLIENT} -q "system flush logs"
|
||||
${CLICKHOUSE_CLIENT} -q "select interface, is_secure from system.query_log where query_id = '2246_${CLICKHOUSE_DATABASE}_http_secure' and type = 'QueryFinish' and current_database = currentDatabase()"
|
Loading…
Reference in New Issue
Block a user