Merge branch 'master' into test-gather-utils-build-time

This commit is contained in:
Nikolai Kochetov 2020-09-16 14:11:27 +03:00
commit b6756472d7
47 changed files with 515 additions and 178 deletions

View File

@ -1,4 +1,4 @@
<yandex>
<yandex>
<http_port remove="remove"/>
<mysql_port remove="remove"/>
<interserver_http_port remove="remove"/>
@ -22,4 +22,6 @@
<uncompressed_cache_size>1000000000</uncompressed_cache_size>
<asynchronous_metrics_update_period_s>10</asynchronous_metrics_update_period_s>
<remap_executable replace="replace">true</remap_executable>
</yandex>

View File

@ -246,7 +246,7 @@ Installing unixODBC and the ODBC driver for PostgreSQL:
$ sudo apt-get install -y unixodbc odbcinst odbc-postgresql
```
Configuring `/etc/odbc.ini` (or `~/.odbc.ini`):
Configuring `/etc/odbc.ini` (or `~/.odbc.ini` if you signed in under a user that runs ClickHouse):
``` text
[DEFAULT]
@ -321,7 +321,7 @@ You may need to edit `odbc.ini` to specify the full path to the library with the
Ubuntu OS.
Installing the driver: :
Installing the ODBC driver for connecting to MS SQL:
``` bash
$ sudo apt-get install tdsodbc freetds-bin sqsh
@ -329,7 +329,7 @@ $ sudo apt-get install tdsodbc freetds-bin sqsh
Configuring the driver:
``` bash
```bash
$ cat /etc/freetds/freetds.conf
...
@ -339,8 +339,11 @@ Configuring the driver:
tds version = 7.0
client charset = UTF-8
# test TDS connection
$ sqsh -S MSSQL -D database -U user -P password
$ cat /etc/odbcinst.ini
...
[FreeTDS]
Description = FreeTDS
@ -349,8 +352,8 @@ Configuring the driver:
FileUsage = 1
UsageCount = 5
$ cat ~/.odbc.ini
...
$ cat /etc/odbc.ini
# $ cat ~/.odbc.ini # if you signed in under a user that runs ClickHouse
[MSSQL]
Description = FreeTDS
@ -360,8 +363,15 @@ Configuring the driver:
UID = test
PWD = test
Port = 1433
# (optional) test ODBC connection (to use isql-tool install the [unixodbc](https://packages.debian.org/sid/unixodbc)-package)
$ isql -v MSSQL "user" "password"
```
Remarks:
- to determine the earliest TDS version that is supported by a particular SQL Server version, refer to the product documentation or look at [MS-TDS Product Behavior](https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/135d0ebe-5c4c-4a94-99bf-1811eccb9f4a)
Configuring the dictionary in ClickHouse:
``` xml

View File

@ -111,7 +111,7 @@ dictHas('dict_name', id_expr)
**Parameters**
- `dict_name` — Name of the dictionary. [String literal](../../sql-reference/syntax.md#syntax-string-literal).
- `id_expr` — Key value. [Expression](../../sql-reference/syntax.md#syntax-expressions) returning a [UInt64](../../sql-reference/data-types/int-uint.md)-type value.
- `id_expr` — Key value. [Expression](../../sql-reference/syntax.md#syntax-expressions) returning a [UInt64](../../sql-reference/data-types/int-uint.md) or [Tuple](../../sql-reference/data-types/tuple.md)-type value depending on the dictionary configuration.
**Returned value**
@ -189,8 +189,8 @@ dictGet[Type]OrDefault('dict_name', 'attr_name', id_expr, default_value_expr)
- `dict_name` — Name of the dictionary. [String literal](../../sql-reference/syntax.md#syntax-string-literal).
- `attr_name` — Name of the column of the dictionary. [String literal](../../sql-reference/syntax.md#syntax-string-literal).
- `id_expr` — Key value. [Expression](../../sql-reference/syntax.md#syntax-expressions) returning a [UInt64](../../sql-reference/data-types/int-uint.md)-type value.
- `default_value_expr` — Value which is returned if the dictionary doesnt contain a row with the `id_expr` key. [Expression](../../sql-reference/syntax.md#syntax-expressions) returning a value in the data type configured for the `attr_name` attribute.
- `id_expr` — Key value. [Expression](../../sql-reference/syntax.md#syntax-expressions) returning a [UInt64](../../sql-reference/data-types/int-uint.md) or [Tuple](../../sql-reference/data-types/tuple.md)-type value depending on the dictionary configuration.
- `default_value_expr` — Value returned if the dictionary doesnt contain a row with the `id_expr` key. [Expression](../../sql-reference/syntax.md#syntax-expressions) returning the value in the data type configured for the `attr_name` attribute.
**Returned value**

View File

@ -103,7 +103,7 @@ dictHas('dict_name', id)
**Параметры**
- `dict_name` — имя словаря. [Строковый литерал](../syntax.md#syntax-string-literal).
- `id_expr` — значение ключа словаря. [Выражение](../syntax.md#syntax-expressions), возвращающее значение типа [UInt64](../../sql-reference/functions/ext-dict-functions.md).
- `id_expr` — значение ключа словаря. [Выражение](../syntax.md#syntax-expressions), возвращающее значение типа [UInt64](../../sql-reference/functions/ext-dict-functions.md) или [Tuple](../../sql-reference/functions/ext-dict-functions.md) в зависимости от конфигурации словаря.
**Возвращаемое значение**
@ -179,7 +179,7 @@ dictGet[Type]OrDefault('dict_name', 'attr_name', id_expr, default_value_expr)
- `dict_name` — имя словаря. [Строковый литерал](../syntax.md#syntax-string-literal).
- `attr_name` — имя столбца словаря. [Строковый литерал](../syntax.md#syntax-string-literal).
- `id_expr` — значение ключа словаря. [Выражение](../syntax.md#syntax-expressions), возвращающее значение типа [UInt64](../../sql-reference/functions/ext-dict-functions.md).
- `id_expr` — значение ключа словаря. [Выражение](../syntax.md#syntax-expressions), возвращающее значение типа [UInt64](../../sql-reference/functions/ext-dict-functions.md) или [Tuple](../../sql-reference/functions/ext-dict-functions.md) в зависимости от конфигурации словаря.
- `default_value_expr` — значение, возвращаемое в том случае, когда словарь не содержит строки с заданным ключом `id_expr`. [Выражение](../syntax.md#syntax-expressions) возвращающее значение с типом данных, сконфигурированным для атрибута `attr_name`.
**Возвращаемое значение**

View File

@ -32,6 +32,7 @@
#include <Common/getExecutablePath.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/ThreadStatus.h>
#include <Common/remapExecutable.h>
#include <IO/HTTPCommon.h>
#include <IO/UseSSL.h>
#include <Interpreters/AsynchronousMetrics.h>
@ -305,6 +306,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// After full config loaded
{
if (config().getBool("remap_executable", false))
{
LOG_DEBUG(log, "Will remap executable in memory.");
remapExecutable();
LOG_DEBUG(log, "The code in memory has been successfully remapped.");
}
if (config().getBool("mlock_executable", false))
{
if (hasLinuxCapability(CAP_IPC_LOCK))

View File

@ -302,6 +302,9 @@
-->
<mlock_executable>true</mlock_executable>
<!-- Reallocate memory for machine code ("text") using huge pages. Highly experimental. -->
<remap_executable>false</remap_executable>
<!-- Configuration of clusters that could be used in Distributed tables.
https://clickhouse.tech/docs/en/operations/table_engines/distributed/
-->

View File

@ -186,7 +186,7 @@ void AccessControlManager::addUsersConfigStorage(
{
if (auto users_config_storage = typeid_cast<std::shared_ptr<UsersConfigAccessStorage>>(storage))
{
if (users_config_storage->getStoragePath() == users_config_path_)
if (users_config_storage->isPathEqual(users_config_path_))
return;
}
}
@ -229,7 +229,7 @@ void AccessControlManager::addDiskStorage(const String & storage_name_, const St
{
if (auto disk_storage = typeid_cast<std::shared_ptr<DiskAccessStorage>>(storage))
{
if (disk_storage->isStoragePathEqual(directory_))
if (disk_storage->isPathEqual(directory_))
{
if (readonly_)
disk_storage->setReadOnly(readonly_);

View File

@ -33,6 +33,9 @@
#include <Interpreters/InterpreterShowGrantsQuery.h>
#include <Common/quoteString.h>
#include <Core/Defines.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/algorithm/copy.hpp>
#include <boost/range/algorithm_ext/push_back.hpp>
@ -342,9 +345,22 @@ DiskAccessStorage::~DiskAccessStorage()
}
bool DiskAccessStorage::isStoragePathEqual(const String & directory_path_) const
String DiskAccessStorage::getStorageParamsJSON() const
{
return getStoragePath() == makeDirectoryPathCanonical(directory_path_);
std::lock_guard lock{mutex};
Poco::JSON::Object json;
json.set("path", directory_path);
if (readonly)
json.set("readonly", readonly.load());
std::ostringstream oss;
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}
bool DiskAccessStorage::isPathEqual(const String & directory_path_) const
{
return getPath() == makeDirectoryPathCanonical(directory_path_);
}

View File

@ -18,12 +18,13 @@ public:
~DiskAccessStorage() override;
const char * getStorageType() const override { return STORAGE_TYPE; }
String getStorageParamsJSON() const override;
String getStoragePath() const override { return directory_path; }
bool isStoragePathEqual(const String & directory_path_) const;
String getPath() const { return directory_path; }
bool isPathEqual(const String & directory_path_) const;
void setReadOnly(bool readonly_) { readonly = readonly_; }
bool isStorageReadOnly() const override { return readonly; }
bool isReadOnly() const { return readonly; }
private:
std::optional<UUID> findImpl(EntityType type, const String & name) const override;

View File

@ -25,8 +25,9 @@ public:
/// Returns the name of this storage.
const String & getStorageName() const { return storage_name; }
virtual const char * getStorageType() const = 0;
virtual String getStoragePath() const { return {}; }
virtual bool isStorageReadOnly() const { return false; }
/// Returns a JSON with the parameters of the storage. It's up to the storage type to fill the JSON.
virtual String getStorageParamsJSON() const { return "{}"; }
using EntityType = IAccessEntity::Type;
using EntityTypeInfo = IAccessEntity::TypeInfo;

View File

@ -10,6 +10,9 @@
#include <Core/Settings.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/MD5Engine.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <common/logger_useful.h>
#include <boost/range/algorithm/copy.hpp>
#include <boost/range/adaptor/map.hpp>
@ -482,12 +485,29 @@ UsersConfigAccessStorage::UsersConfigAccessStorage(const String & storage_name_,
UsersConfigAccessStorage::~UsersConfigAccessStorage() = default;
String UsersConfigAccessStorage::getStoragePath() const
String UsersConfigAccessStorage::getStorageParamsJSON() const
{
std::lock_guard lock{load_mutex};
Poco::JSON::Object json;
if (!path.empty())
json.set("path", path);
std::ostringstream oss;
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}
String UsersConfigAccessStorage::getPath() const
{
std::lock_guard lock{load_mutex};
return path;
}
bool UsersConfigAccessStorage::isPathEqual(const String & path_) const
{
return getPath() == path_;
}
void UsersConfigAccessStorage::setConfig(const Poco::Util::AbstractConfiguration & config)
{

View File

@ -26,8 +26,10 @@ public:
~UsersConfigAccessStorage() override;
const char * getStorageType() const override { return STORAGE_TYPE; }
String getStoragePath() const override;
bool isStorageReadOnly() const override { return true; }
String getStorageParamsJSON() const override;
String getPath() const;
bool isPathEqual(const String & path_) const;
void setConfig(const Poco::Util::AbstractConfiguration & config);

View File

@ -117,6 +117,10 @@ endif ()
add_library(clickhouse_common_io ${clickhouse_common_io_headers} ${clickhouse_common_io_sources})
if (SPLIT_SHARED_LIBRARIES)
target_compile_definitions(clickhouse_common_io PRIVATE SPLIT_SHARED_LIBRARIES)
endif ()
add_library (clickhouse_malloc OBJECT Common/malloc.cpp)
set_source_files_properties(Common/malloc.cpp PROPERTIES COMPILE_FLAGS "-fno-builtin")

View File

@ -0,0 +1,201 @@
#if defined(__linux__) && defined(__amd64__) && defined(__SSE2__) && !defined(SANITIZER) && defined(NDEBUG) && !defined(SPLIT_SHARED_LIBRARIES)
#include <sys/mman.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <emmintrin.h>
#include <utility>
#include <Common/StringUtils/StringUtils.h>
#include <Common/hex.h>
#include <Common/Exception.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include "remapExecutable.h"
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_ALLOCATE_MEMORY;
}
namespace
{
uintptr_t readAddressHex(DB::ReadBuffer & in)
{
uintptr_t res = 0;
while (!in.eof())
{
if (isHexDigit(*in.position()))
{
res *= 16;
res += unhex(*in.position());
++in.position();
}
else
break;
}
return res;
}
/** Find the address and size of the mapped memory region pointed by ptr.
*/
std::pair<void *, size_t> getMappedArea(void * ptr)
{
using namespace DB;
uintptr_t uintptr = reinterpret_cast<uintptr_t>(ptr);
ReadBufferFromFile in("/proc/self/maps");
while (!in.eof())
{
uintptr_t begin = readAddressHex(in);
assertChar('-', in);
uintptr_t end = readAddressHex(in);
skipToNextLineOrEOF(in);
if (begin <= uintptr && uintptr < end)
return {reinterpret_cast<void *>(begin), end - begin};
}
throw Exception("Cannot find mapped area for pointer", ErrorCodes::LOGICAL_ERROR);
}
__attribute__((__noinline__)) int64_t our_syscall(...)
{
__asm__ __volatile__ (R"(
movq %%rdi,%%rax;
movq %%rsi,%%rdi;
movq %%rdx,%%rsi;
movq %%rcx,%%rdx;
movq %%r8,%%r10;
movq %%r9,%%r8;
movq 8(%%rsp),%%r9;
syscall;
ret
)" : : : "memory");
return 0;
}
__attribute__((__noinline__)) void remapToHugeStep3(void * scratch, size_t size, size_t offset)
{
/// The function should not use the stack, otherwise various optimizations, including "omit-frame-pointer" may break the code.
/// Unmap the scratch area.
our_syscall(SYS_munmap, scratch, size);
/** The return address of this function is pointing to scratch area (because it was called from there).
* But the scratch area no longer exists. We should correct the return address by subtracting the offset.
*/
__asm__ __volatile__("subq %0, 8(%%rsp)" : : "r"(offset) : "memory");
}
__attribute__((__noinline__)) void remapToHugeStep2(void * begin, size_t size, void * scratch)
{
/** Unmap old memory region with the code of our program.
* Our instruction pointer is located inside scratch area and this function can execute after old code is unmapped.
* But it cannot call any other functions because they are not available at usual addresses
* - that's why we have to use "our_syscall" function and a substitution for memcpy.
* (Relative addressing may continue to work but we should not assume that).
*/
int64_t offset = reinterpret_cast<intptr_t>(scratch) - reinterpret_cast<intptr_t>(begin);
int64_t (*syscall_func)(...) = reinterpret_cast<int64_t (*)(...)>(reinterpret_cast<intptr_t>(our_syscall) + offset);
int64_t munmap_res = syscall_func(SYS_munmap, begin, size);
if (munmap_res != 0)
return;
/// Map new anonymous memory region in place of old region with code.
int64_t mmap_res = syscall_func(SYS_mmap, begin, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_FIXED, -1, 0);
if (-1 == mmap_res)
syscall_func(SYS_exit, 1);
/// As the memory region is anonymous, we can do madvise with MADV_HUGEPAGE.
syscall_func(SYS_madvise, begin, size, MADV_HUGEPAGE);
/// Copy the code from scratch area to the old memory location.
{
__m128i * __restrict dst = reinterpret_cast<__m128i *>(begin);
const __m128i * __restrict src = reinterpret_cast<const __m128i *>(scratch);
const __m128i * __restrict src_end = reinterpret_cast<const __m128i *>(reinterpret_cast<const char *>(scratch) + size);
while (src < src_end)
{
_mm_storeu_si128(dst, _mm_loadu_si128(src));
++dst;
++src;
}
}
/// Make the memory area with the code executable and non-writable.
syscall_func(SYS_mprotect, begin, size, PROT_READ | PROT_EXEC);
/** Step 3 function should unmap the scratch area.
* The currently executed code is located in the scratch area and cannot be removed here.
* We have to call another function and use its address from the original location (not in scratch area).
* To do it, we obtain its pointer and call by pointer.
*/
void(* volatile step3)(void*, size_t, size_t) = remapToHugeStep3;
step3(scratch, size, offset);
}
__attribute__((__noinline__)) void remapToHugeStep1(void * begin, size_t size)
{
/// Allocate scratch area and copy the code there.
void * scratch = mmap(nullptr, size, PROT_READ | PROT_WRITE | PROT_EXEC, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == scratch)
throwFromErrno(fmt::format("Cannot mmap {} bytes", size), ErrorCodes::CANNOT_ALLOCATE_MEMORY);
memcpy(scratch, begin, size);
/// Offset to the scratch area from previous location.
int64_t offset = reinterpret_cast<intptr_t>(scratch) - reinterpret_cast<intptr_t>(begin);
/// Jump to the next function inside the scratch area.
reinterpret_cast<void(*)(void*, size_t, void*)>(reinterpret_cast<intptr_t>(remapToHugeStep2) + offset)(begin, size, scratch);
}
}
void remapExecutable()
{
auto [begin, size] = getMappedArea(reinterpret_cast<void *>(remapExecutable));
remapToHugeStep1(begin, size);
}
}
#else
namespace DB
{
void remapExecutable() {}
}
#endif

View File

@ -0,0 +1,7 @@
namespace DB
{
/// This function tries to reallocate the code of the running program in a more efficient way.
void remapExecutable();
}

View File

@ -74,6 +74,7 @@ SRCS(
QueryProfiler.cpp
quoteString.cpp
randomSeed.cpp
remapExecutable.cpp
RemoteHostFilter.cpp
renameat2.cpp
RWLock.cpp

View File

@ -63,7 +63,7 @@ Block IBlockInputStream::read()
if (enabled_extremes)
updateExtremes(res);
if (limits.mode == LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
if (limits.mode == LimitsMode::LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
limit_exceeded_need_break = true;
if (quota)
@ -209,11 +209,11 @@ void IBlockInputStream::checkQuota(Block & block)
{
switch (limits.mode)
{
case LIMITS_TOTAL:
case LimitsMode::LIMITS_TOTAL:
/// Checked in `progress` method.
break;
case LIMITS_CURRENT:
case LimitsMode::LIMITS_CURRENT:
{
UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds();
quota->used({Quota::RESULT_ROWS, block.rows()}, {Quota::RESULT_BYTES, block.bytes()}, {Quota::EXECUTION_TIME, total_elapsed - prev_elapsed});
@ -242,7 +242,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
/** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
* NOTE: Maybe it makes sense to have them checked directly in ProcessList?
*/
if (limits.mode == LIMITS_TOTAL)
if (limits.mode == LimitsMode::LIMITS_TOTAL)
{
if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read",
ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
@ -262,7 +262,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
if (quota && limits.mode == LIMITS_TOTAL)
if (quota && limits.mode == LimitsMode::LIMITS_TOTAL)
quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes});
}

View File

@ -5,6 +5,7 @@
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/ExecutionSpeedLimits.h>
#include <DataStreams/StreamLocalLimits.h>
#include <IO/Progress.h>
#include <Storages/TableLockHolder.h>
#include <Common/TypePromotion.h>
@ -173,38 +174,13 @@ public:
bool isCancelled() const;
bool isCancelledOrThrowIfKilled() const;
/** What limitations and quotas should be checked.
* LIMITS_CURRENT - checks amount of data returned by current stream only (BlockStreamProfileInfo is used for check).
* Currently it is used in root streams to check max_result_{rows,bytes} limits.
* LIMITS_TOTAL - checks total amount of read data from leaf streams (i.e. data read from disk and remote servers).
* It is checks max_{rows,bytes}_to_read in progress handler and use info from ProcessListElement::progress_in for this.
* Currently this check is performed only in leaf streams.
*/
enum LimitsMode
{
LIMITS_CURRENT,
LIMITS_TOTAL,
};
/// It is a subset of limitations from Limits.
struct LocalLimits
{
LimitsMode mode = LIMITS_CURRENT;
SizeLimits size_limits;
ExecutionSpeedLimits speed_limits;
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
};
/** Set limitations that checked on each block. */
virtual void setLimits(const LocalLimits & limits_)
virtual void setLimits(const StreamLocalLimits & limits_)
{
limits = limits_;
}
const LocalLimits & getLimits() const
const StreamLocalLimits & getLimits() const
{
return limits;
}
@ -268,7 +244,7 @@ private:
/// Limitations and quotas.
LocalLimits limits;
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota; /// If nullptr - the quota is not used.
UInt64 prev_elapsed = 0;

View File

@ -0,0 +1,33 @@
#pragma once
#include <DataStreams/SizeLimits.h>
#include <DataStreams/ExecutionSpeedLimits.h>
namespace DB
{
/** What limitations and quotas should be checked.
* LIMITS_CURRENT - checks amount of data returned by current stream only (BlockStreamProfileInfo is used for check).
* Currently it is used in root streams to check max_result_{rows,bytes} limits.
* LIMITS_TOTAL - checks total amount of read data from leaf streams (i.e. data read from disk and remote servers).
* It is checks max_{rows,bytes}_to_read in progress handler and use info from ProcessListElement::progress_in for this.
* Currently this check is performed only in leaf streams.
*/
enum class LimitsMode
{
LIMITS_CURRENT,
LIMITS_TOTAL,
};
/// It is a subset of limitations from Limits.
struct StreamLocalLimits
{
LimitsMode mode = LimitsMode::LIMITS_CURRENT;
SizeLimits size_limits;
ExecutionSpeedLimits speed_limits;
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
};
}

View File

@ -1071,6 +1071,37 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
executeSubqueriesInSetsAndJoins(query_plan, subqueries_for_sets);
}
static StreamLocalLimits getLimitsForStorage(const Settings & settings, const SelectQueryOptions & options)
{
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read,
settings.read_overflow_mode);
limits.speed_limits.max_execution_time = settings.max_execution_time;
limits.timeout_overflow_mode = settings.timeout_overflow_mode;
/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
* because the initiating server has a summary of the execution of the request on all servers.
*
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
* additionally on each remote server, because these limits are checked per block of data processed,
* and remote servers may process way more blocks of data than are received by initiator.
*
* The limits to throttle maximum execution speed is also checked on all servers.
*/
if (options.to_stage == QueryProcessingStage::Complete)
{
limits.speed_limits.min_execution_rps = settings.min_execution_speed;
limits.speed_limits.min_execution_bps = settings.min_execution_speed_bytes;
}
limits.speed_limits.max_execution_rps = settings.max_execution_speed;
limits.speed_limits.max_execution_bps = settings.max_execution_speed_bytes;
limits.speed_limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
return limits;
}
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan,
const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere)
@ -1409,12 +1440,18 @@ void InterpreterSelectQuery::executeFetchColumns(
query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage, metadata_snapshot);
}
auto read_step = std::make_unique<ReadFromStorageStep>(
table_lock, metadata_snapshot, options, storage,
required_columns, query_info, context, processing_stage, max_block_size, max_streams);
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota;
read_step->setStepDescription("Read from " + storage->getName());
query_plan.addStep(std::move(read_step));
/// Set the limits and quota for reading data, the speed and time of the query.
if (!options.ignore_limits)
limits = getLimitsForStorage(settings, options);
if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
quota = context->getQuota();
storage->read(query_plan, table_lock, metadata_snapshot, limits, std::move(quota),
required_columns, query_info, context, processing_stage, max_block_size, max_streams);
}
else
throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR);

View File

@ -17,6 +17,7 @@ limitations under the License. */
#include <Access/AccessFlags.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/StreamLocalLimits.h>
namespace DB
@ -76,8 +77,8 @@ BlockIO InterpreterWatchQuery::execute()
/// Constraints on the result, the quota on the result, and also callback for progress.
if (IBlockInputStream * stream = dynamic_cast<IBlockInputStream *>(streams[0].get()))
{
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_CURRENT;
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_CURRENT;
limits.size_limits.max_rows = settings.max_result_rows;
limits.size_limits.max_bytes = settings.max_result_bytes;
limits.size_limits.overflow_mode = settings.result_overflow_mode;

View File

@ -382,10 +382,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
}
IBlockInputStream::LocalLimits limits;
StreamLocalLimits limits;
if (!interpreter->ignoreLimits())
{
limits.mode = IBlockInputStream::LIMITS_CURRENT;
limits.mode = LimitsMode::LIMITS_CURRENT;
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}

View File

@ -103,7 +103,7 @@ void PipelineExecutingBlockInputStream::setProcessListElement(QueryStatus * elem
pipeline->setProcessListElement(elem);
}
void PipelineExecutingBlockInputStream::setLimits(const IBlockInputStream::LocalLimits & limits_)
void PipelineExecutingBlockInputStream::setLimits(const StreamLocalLimits & limits_)
{
throwIfExecutionStarted(is_execution_started, "setLimits");

View File

@ -24,7 +24,7 @@ public:
/// Implement IBlockInputStream methods via QueryPipeline.
void setProgressCallback(const ProgressCallback & callback) final;
void setProcessListElement(QueryStatus * elem) final;
void setLimits(const LocalLimits & limits_) final;
void setLimits(const StreamLocalLimits & limits_) final;
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final;
void addTotalRowsApprox(size_t value) final;

View File

@ -779,7 +779,7 @@ void Pipe::transform(const Transformer & transformer)
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::setLimits(const ISourceWithProgress::LocalLimits & limits)
void Pipe::setLimits(const StreamLocalLimits & limits)
{
for (auto & processor : processors)
{

View File

@ -6,6 +6,8 @@
namespace DB
{
struct StreamLocalLimits;
class Pipe;
using Pipes = std::vector<Pipe>;
@ -94,7 +96,7 @@ public:
const Processors & getProcessors() const { return processors; }
/// Specify quotas and limits for every ISourceWithProgress.
void setLimits(const SourceWithProgress::LocalLimits & limits);
void setLimits(const StreamLocalLimits & limits);
void setQuota(const std::shared_ptr<const EnabledQuota> & quota);
/// Do not allow to change the table while the processors of pipe are alive.

View File

@ -56,8 +56,8 @@ void PartialSortingStep::transformPipeline(QueryPipeline & pipeline)
return std::make_shared<PartialSortingTransform>(header, sort_description, limit);
});
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_CURRENT;
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_CURRENT;
limits.size_limits = size_limits;
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr

View File

@ -13,8 +13,9 @@ namespace DB
ReadFromStorageStep::ReadFromStorageStep(
TableLockHolder table_lock_,
StorageMetadataPtr & metadata_snapshot_,
SelectQueryOptions options_,
StorageMetadataPtr metadata_snapshot_,
StreamLocalLimits & limits_,
std::shared_ptr<const EnabledQuota> quota_,
StoragePtr storage_,
const Names & required_columns_,
const SelectQueryInfo & query_info_,
@ -23,8 +24,9 @@ ReadFromStorageStep::ReadFromStorageStep(
size_t max_block_size_,
size_t max_streams_)
: table_lock(std::move(table_lock_))
, metadata_snapshot(metadata_snapshot_)
, options(std::move(options_))
, metadata_snapshot(std::move(metadata_snapshot_))
, limits(limits_)
, quota(std::move(quota_))
, storage(std::move(storage_))
, required_columns(required_columns_)
, query_info(query_info_)
@ -82,43 +84,10 @@ ReadFromStorageStep::ReadFromStorageStep(
/// Table lock is stored inside pipeline here.
pipeline->addTableLock(table_lock);
/// Set the limits and quota for reading data, the speed and time of the query.
{
const Settings & settings = context->getSettingsRef();
pipe.setLimits(limits);
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
limits.speed_limits.max_execution_time = settings.max_execution_time;
limits.timeout_overflow_mode = settings.timeout_overflow_mode;
/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
* because the initiating server has a summary of the execution of the request on all servers.
*
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
* additionally on each remote server, because these limits are checked per block of data processed,
* and remote servers may process way more blocks of data than are received by initiator.
*
* The limits to throttle maximum execution speed is also checked on all servers.
*/
if (options.to_stage == QueryProcessingStage::Complete)
{
limits.speed_limits.min_execution_rps = settings.min_execution_speed;
limits.speed_limits.min_execution_bps = settings.min_execution_speed_bytes;
}
limits.speed_limits.max_execution_rps = settings.max_execution_speed;
limits.speed_limits.max_execution_bps = settings.max_execution_speed_bytes;
limits.speed_limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
auto quota = context->getQuota();
if (!options.ignore_limits)
pipe.setLimits(limits);
if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
pipe.setQuota(quota);
}
if (quota)
pipe.setQuota(quota);
pipeline->init(std::move(pipe));

View File

@ -1,7 +1,7 @@
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Core/QueryProcessingStage.h>
#include <Storages/TableLockHolder.h>
#include <Interpreters/SelectQueryOptions.h>
#include <DataStreams/StreamLocalLimits.h>
namespace DB
{
@ -16,14 +16,17 @@ struct SelectQueryInfo;
struct PrewhereInfo;
class EnabledQuota;
/// Reads from storage.
class ReadFromStorageStep : public IQueryPlanStep
{
public:
ReadFromStorageStep(
TableLockHolder table_lock,
StorageMetadataPtr & metadata_snapshot,
SelectQueryOptions options,
StorageMetadataPtr metadata_snapshot,
StreamLocalLimits & limits,
std::shared_ptr<const EnabledQuota> quota,
StoragePtr storage,
const Names & required_columns,
const SelectQueryInfo & query_info,
@ -43,7 +46,8 @@ public:
private:
TableLockHolder table_lock;
StorageMetadataPtr metadata_snapshot;
SelectQueryOptions options;
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota;
StoragePtr storage;
const Names & required_columns;

View File

@ -32,7 +32,7 @@ public:
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); }
/// Implementation for methods from ISourceWithProgress.
void setLimits(const LocalLimits & limits_) final { stream->setLimits(limits_); }
void setLimits(const StreamLocalLimits & limits_) final { stream->setLimits(limits_); }
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final { stream->setQuota(quota_); }
void setProcessListElement(QueryStatus * elem) final { stream->setProcessListElement(elem); }
void setProgressCallback(const ProgressCallback & callback) final { stream->setProgressCallback(callback); }

View File

@ -2,6 +2,7 @@
#include <Processors/ISource.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/Stopwatch.h>
#include <DataStreams/StreamLocalLimits.h>
namespace DB
{
@ -13,11 +14,8 @@ class ISourceWithProgress : public ISource
public:
using ISource::ISource;
using LocalLimits = IBlockInputStream::LocalLimits;
using LimitsMode = IBlockInputStream::LimitsMode;
/// Set limitations that checked on each chunk.
virtual void setLimits(const LocalLimits & limits_) = 0;
virtual void setLimits(const StreamLocalLimits & limits_) = 0;
/// Set the quota. If you set a quota on the amount of raw data,
/// then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
@ -47,10 +45,7 @@ public:
/// If enable_auto_progress flag is set, progress() will be automatically called on each generated chunk.
SourceWithProgress(Block header, bool enable_auto_progress);
using LocalLimits = IBlockInputStream::LocalLimits;
using LimitsMode = IBlockInputStream::LimitsMode;
void setLimits(const LocalLimits & limits_) final { limits = limits_; }
void setLimits(const StreamLocalLimits & limits_) final { limits = limits_; }
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final { quota = quota_; }
void setProcessListElement(QueryStatus * elem) final { process_list_elem = elem; }
void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; }
@ -63,7 +58,7 @@ protected:
void work() override;
private:
LocalLimits limits;
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota;
ProgressCallback progress_callback;
QueryStatus * process_list_elem = nullptr;

View File

@ -18,7 +18,7 @@ void ProcessorProfileInfo::update(const Chunk & block)
bytes += block.bytes();
}
LimitsCheckingTransform::LimitsCheckingTransform(const Block & header_, LocalLimits limits_)
LimitsCheckingTransform::LimitsCheckingTransform(const Block & header_, StreamLocalLimits limits_)
: ISimpleTransform(header_, header_, false)
, limits(std::move(limits_))
{

View File

@ -4,7 +4,7 @@
#include <Poco/Timespan.h>
#include <Interpreters/ProcessList.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/StreamLocalLimits.h>
namespace DB
{
@ -26,10 +26,7 @@ class LimitsCheckingTransform : public ISimpleTransform
{
public:
using LocalLimits = IBlockInputStream::LocalLimits;
using LimitsMode = IBlockInputStream::LimitsMode;
LimitsCheckingTransform(const Block & header_, LocalLimits limits_);
LimitsCheckingTransform(const Block & header_, StreamLocalLimits limits_);
String getName() const override { return "LimitsCheckingTransform"; }
@ -39,7 +36,7 @@ protected:
void transform(Chunk & chunk) override;
private:
LocalLimits limits;
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota;
UInt64 prev_elapsed = 0;

View File

@ -7,6 +7,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Processors/Pipe.h>
#include <Processors/QueryPlan/ReadFromStorageStep.h>
#include <Interpreters/Context.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/quoteString.h>
@ -91,6 +92,27 @@ Pipe IStorage::read(
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void IStorage::read(
QueryPlan & query_plan,
TableLockHolder table_lock,
StorageMetadataPtr metadata_snapshot,
StreamLocalLimits & limits,
std::shared_ptr<const EnabledQuota> quota,
const Names & column_names,
const SelectQueryInfo & query_info,
std::shared_ptr<Context> context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
auto read_step = std::make_unique<ReadFromStorageStep>(
std::move(table_lock), std::move(metadata_snapshot), limits, std::move(quota), shared_from_this(),
column_names, query_info, std::move(context), processed_stage, max_block_size, num_streams);
read_step->setStepDescription("Read from " + getName());
query_plan.addStep(std::move(read_step));
}
Pipe IStorage::alterPartition(
const ASTPtr & /* query */,
const StorageMetadataPtr & /* metadata_snapshot */,

View File

@ -48,6 +48,7 @@ using ProcessorPtr = std::shared_ptr<IProcessor>;
using Processors = std::vector<ProcessorPtr>;
class Pipe;
class QueryPlan;
class StoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
@ -280,6 +281,21 @@ public:
size_t /*max_block_size*/,
unsigned /*num_streams*/);
/// Other version of read which adds reading step to query plan.
/// Default implementation creates ReadFromStorageStep and uses usual read.
virtual void read(
QueryPlan & query_plan,
TableLockHolder table_lock,
StorageMetadataPtr metadata_snapshot,
StreamLocalLimits & limits,
std::shared_ptr<const EnabledQuota> quota,
const Names & column_names,
const SelectQueryInfo & query_info,
std::shared_ptr<Context> context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams);
/** Writes the data to a table.
* Receives a description of the query, which can contain information about the data write method.
* Returns an object by which you can write data sequentially.

View File

@ -565,7 +565,7 @@ bool StorageKafka::streamToViews()
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits;
StreamLocalLimits limits;
limits.speed_limits.max_execution_time = kafka_settings->kafka_flush_interval_ms.changed
? kafka_settings->kafka_flush_interval_ms

View File

@ -752,7 +752,7 @@ bool StorageRabbitMQ::streamToViews()
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits;
StreamLocalLimits limits;
limits.speed_limits.max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms

View File

@ -380,8 +380,10 @@ StorageDistributed::StorageDistributed(
if (!relative_data_path.empty())
{
storage_policy = global_context->getStoragePolicy(storage_policy_name_);
if (storage_policy->getVolumes().size() != 1)
throw Exception("Storage policy for Distributed table, should have exactly one volume", ErrorCodes::BAD_ARGUMENTS);
data_volume = storage_policy->getVolume(0);
if (storage_policy->getVolumes().size() > 1)
LOG_WARNING(log, "Storage policy for Distributed table has multiple volumes. "
"Only {} volume will be used to store data. Other will be ignored.", data_volume->getName());
}
/// Sanity check. Skip check if the table is already created to allow the server to start.
@ -599,7 +601,7 @@ void StorageDistributed::startup()
if (!storage_policy)
return;
for (const DiskPtr & disk : storage_policy->getDisks())
for (const DiskPtr & disk : data_volume->getDisks())
createDirectoryMonitors(disk->getPath());
for (const String & path : getDataPaths())
@ -633,7 +635,7 @@ void StorageDistributed::drop()
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on DROP TABLE");
auto disks = storage_policy->getDisks();
auto disks = data_volume->getDisks();
for (const auto & disk : disks)
disk->removeRecursive(relative_data_path);
@ -647,7 +649,7 @@ Strings StorageDistributed::getDataPaths() const
if (relative_data_path.empty())
return paths;
for (const DiskPtr & disk : storage_policy->getDisks())
for (const DiskPtr & disk : data_volume->getDisks())
paths.push_back(disk->getPath() + relative_data_path);
return paths;
@ -866,7 +868,7 @@ void StorageDistributed::rename(const String & new_path_to_table_data, const Sto
void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
{
for (const DiskPtr & disk : storage_policy->getDisks())
for (const DiskPtr & disk : data_volume->getDisks())
{
const String path(disk->getPath());
auto new_path = path + new_path_to_table_data;

View File

@ -183,6 +183,11 @@ protected:
/// Can be empty if relative_data_path is empty. In this case, a directory for the data to be sent is not created.
StoragePolicyPtr storage_policy;
/// The main volume to store data.
/// Storage policy may have several configured volumes, but second and other volumes are used for parts movement in MergeTree engine.
/// For Distributed engine such configuration doesn't make sense and only the first (main) volume will be used to store data.
/// Other volumes will be ignored. It's needed to allow using the same multi-volume policy both for Distributed and other engines.
VolumePtr data_volume;
struct ClusterNodeData
{

View File

@ -15,8 +15,7 @@ NamesAndTypesList StorageSystemUserDirectories::getNamesAndTypes()
NamesAndTypesList names_and_types{
{"name", std::make_shared<DataTypeString>()},
{"type", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"readonly", std::make_shared<DataTypeUInt8>()},
{"params", std::make_shared<DataTypeString>()},
{"precedence", std::make_shared<DataTypeUInt64>()},
};
return names_and_types;
@ -31,21 +30,18 @@ void StorageSystemUserDirectories::fillData(MutableColumns & res_columns, const
size_t column_index = 0;
auto & column_name = assert_cast<ColumnString &>(*res_columns[column_index++]);
auto & column_type = assert_cast<ColumnString &>(*res_columns[column_index++]);
auto & column_path = assert_cast<ColumnString &>(*res_columns[column_index++]);
auto & column_readonly = assert_cast<ColumnUInt8 &>(*res_columns[column_index++]);
auto & column_params = assert_cast<ColumnString &>(*res_columns[column_index++]);
auto & column_precedence = assert_cast<ColumnUInt64 &>(*res_columns[column_index++]);
auto add_row = [&](const IAccessStorage & storage, size_t precedence)
{
const String & name = storage.getStorageName();
std::string_view type = storage.getStorageType();
const String & path = storage.getStoragePath();
bool readonly = storage.isStorageReadOnly();
String params = storage.getStorageParamsJSON();
column_name.insertData(name.data(), name.length());
column_type.insertData(type.data(), type.length());
column_path.insertData(path.data(), path.length());
column_readonly.insert(readonly);
column_params.insertData(params.data(), params.length());
column_precedence.insert(precedence);
};

View File

@ -89,7 +89,7 @@ def test_backup_from_old_version_setting(started_cluster):
assert node2.query("SELECT sum(A) FROM dest_table") == "3\n"
assert node1.query("CHECK TABLE dest_table") == "1\n"
assert node2.query("CHECK TABLE dest_table") == "1\n"
def test_backup_from_old_version_config(started_cluster):
@ -128,7 +128,7 @@ def test_backup_from_old_version_config(started_cluster):
assert node3.query("SELECT sum(A) FROM dest_table") == "3\n"
assert node1.query("CHECK TABLE dest_table") == "1\n"
assert node3.query("CHECK TABLE dest_table") == "1\n"
def test_backup_and_alter(started_cluster):

View File

@ -71,11 +71,16 @@ def test_no_ttl_merges_in_busy_pool(started_cluster):
node1.query("SYSTEM START TTL MERGES")
rows_count = []
while count_running_mutations(node1, "test_ttl") == 6:
print "Mutations count after start TTL", count_running_mutations(node1, "test_ttl")
assert node1.query("SELECT count() FROM test_ttl") == "30\n"
rows_count.append(int(node1.query("SELECT count() FROM test_ttl").strip()))
time.sleep(0.5)
# at least several seconds we didn't run any TTL merges and rows count equal
# to the original value
assert sum([1 for count in rows_count if count == 30]) > 4
assert_eq_with_retry(node1, "SELECT COUNT() FROM test_ttl", "0")

View File

@ -4,6 +4,7 @@ from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
import pymysql
import warnings
import time
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -243,6 +244,7 @@ def test_dictionary_with_where(started_cluster):
assert node1.query("SELECT dictGetString('default.special_dict', 'value1', toUInt64(2))") == 'qweqwe\n'
def test_clickhouse_remote(started_cluster):
with pytest.raises(QueryRuntimeException):
node3.query("""
@ -256,7 +258,9 @@ def test_clickhouse_remote(started_cluster):
SOURCE(CLICKHOUSE(HOST 'node4' PORT 9000 USER 'default' TABLE 'xml_dictionary_table' DB 'test'))
LIFETIME(MIN 1 MAX 10)
""")
node3.query("system reload dictionaries")
for i in range(5):
node3.query("system reload dictionary test.clickhouse_remote")
time.sleep(0.5)
node3.query("detach dictionary if exists test.clickhouse_remote")
node3.query("""
@ -272,4 +276,3 @@ def test_clickhouse_remote(started_cluster):
""")
node3.query("select dictGetUInt8('test.clickhouse_remote', 'SomeValue1', toUInt64(17))") == '17\n'

View File

@ -39,9 +39,12 @@ def start_cluster():
finally:
cluster.shutdown()
def test_work(start_cluster):
query = instance.query
instance.query("SYSTEM RELOAD DICTIONARIES")
assert query("SELECT dictGetString('test_file', 'first', toUInt64(1))") == "\\\'a\n"
assert query("SELECT dictGetString('test_file', 'second', toUInt64(1))") == "\"b\n"
assert query("SELECT dictGetString('test_executable', 'first', toUInt64(1))") == "\\\'a\n"
@ -58,4 +61,4 @@ def test_work(start_cluster):
assert caught_exception.find("Limit for result exceeded") != -1
assert query("SELECT dictGetString('test_http', 'first', toUInt64(1))") == "\\\'a\n"
assert query("SELECT dictGetString('test_http', 'second', toUInt64(1))") == "\"b\n"
assert query("SELECT dictGetString('test_http', 'second', toUInt64(1))") == "\"b\n"

View File

@ -350,7 +350,7 @@ def query_event_with_empty_transaction(clickhouse_node, mysql_node, service_name
mysql_node.query("INSERT INTO test_database.t1(a) VALUES(2)")
mysql_node.query("/* start */ commit /* end */")
check_query(clickhouse_node, "SELECT * FROM test_database.t1 ORDER BY a FORMAT TSV",
"1\tBEGIN\n2\tBEGIN\n")
check_query(clickhouse_node, "SHOW TABLES FROM test_database FORMAT TSV", "t1\n")
check_query(clickhouse_node, "SELECT * FROM test_database.t1 ORDER BY a FORMAT TSV", "1\tBEGIN\n2\tBEGIN\n")
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")

View File

@ -24,40 +24,40 @@ def started_cluster():
def test_old_style():
node.copy_file_to_container(os.path.join(SCRIPT_DIR, "configs/old_style.xml"), '/etc/clickhouse-server/config.d/z.xml')
node.restart_clickhouse()
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", "/etc/clickhouse-server/users2.xml", 1, 1],
["local directory", "local directory", "/var/lib/clickhouse/access2/", 0, 2]])
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", '{"path":"\\\\/etc\\\\/clickhouse-server\\\\/users2.xml"}', 1],
["local directory", "local directory", '{"path":"\\\\/var\\\\/lib\\\\/clickhouse\\\\/access2\\\\/"}', 2]])
def test_local_directories():
node.copy_file_to_container(os.path.join(SCRIPT_DIR, "configs/local_directories.xml"), '/etc/clickhouse-server/config.d/z.xml')
node.restart_clickhouse()
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", "/etc/clickhouse-server/users3.xml", 1, 1],
["local directory", "local directory", "/var/lib/clickhouse/access3/", 0, 2],
["local directory (ro)", "local directory", "/var/lib/clickhouse/access3-ro/", 1, 3]])
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", '{"path":"\\\\/etc\\\\/clickhouse-server\\\\/users3.xml"}', 1],
["local directory", "local directory", '{"path":"\\\\/var\\\\/lib\\\\/clickhouse\\\\/access3\\\\/"}', 2],
["local directory (ro)", "local directory", '{"path":"\\\\/var\\\\/lib\\\\/clickhouse\\\\/access3-ro\\\\/","readonly":true}', 3]])
def test_relative_path():
node.copy_file_to_container(os.path.join(SCRIPT_DIR, "configs/relative_path.xml"), '/etc/clickhouse-server/config.d/z.xml')
node.restart_clickhouse()
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", "/etc/clickhouse-server/users4.xml", 1, 1]])
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", '{"path":"\\\\/etc\\\\/clickhouse-server\\\\/users4.xml"}', 1]])
def test_memory():
node.copy_file_to_container(os.path.join(SCRIPT_DIR, "configs/memory.xml"), '/etc/clickhouse-server/config.d/z.xml')
node.restart_clickhouse()
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", "/etc/clickhouse-server/users5.xml", 1, 1],
["memory", "memory", "", 0, 2]])
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", '{"path":"\\\\/etc\\\\/clickhouse-server\\\\/users5.xml"}', 1],
["memory", "memory", '{}', 2]])
def test_mixed_style():
node.copy_file_to_container(os.path.join(SCRIPT_DIR, "configs/mixed_style.xml"), '/etc/clickhouse-server/config.d/z.xml')
node.restart_clickhouse()
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", "/etc/clickhouse-server/users6.xml", 1, 1],
["local directory", "local directory", "/var/lib/clickhouse/access6/", 0, 2],
["local directory", "local directory", "/var/lib/clickhouse/access6a/", 0, 3],
["memory", "memory", "", 0, 4]])
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", '{"path":"\\\\/etc\\\\/clickhouse-server\\\\/users6.xml"}', 1],
["local directory", "local directory", '{"path":"\\\\/var\\\\/lib\\\\/clickhouse\\\\/access6\\\\/"}', 2],
["local directory", "local directory", '{"path":"\\\\/var\\\\/lib\\\\/clickhouse\\\\/access6a\\\\/"}', 3],
["memory", "memory", '{}', 4]])
def test_duplicates():
node.copy_file_to_container(os.path.join(SCRIPT_DIR, "configs/duplicates.xml"), '/etc/clickhouse-server/config.d/z.xml')
node.restart_clickhouse()
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", "/etc/clickhouse-server/users7.xml", 1, 1],
["local directory", "local directory", "/var/lib/clickhouse/access7/", 0, 2]])
assert node.query("SELECT * FROM system.user_directories") == TSV([["users.xml", "users.xml", '{"path":"\\\\/etc\\\\/clickhouse-server\\\\/users7.xml"}', 1],
["local directory", "local directory", '{"path":"\\\\/var\\\\/lib\\\\/clickhouse\\\\/access7\\\\/"}', 2]])

View File

@ -28,15 +28,12 @@ ${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_r1 DELETE WHERE d = '11'" 2>
# Delete some values
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_r1 DELETE WHERE x % 2 = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_r1 DELETE WHERE s = 'd'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_r1 DELETE WHERE m = 3"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_r1 DELETE WHERE m = 3 SETTINGS mutations_sync = 2"
# Insert more data
${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_r1(d, x, s) VALUES \
('2000-01-01', 5, 'e'), ('2000-02-01', 5, 'e')"
# Wait until the last mutation is done.
wait_for_mutation "mutations_r2" "0000000003"
# Check that the table contains only the data that should not be deleted.
${CLICKHOUSE_CLIENT} --query="SELECT d, x, s, m FROM mutations_r2 ORDER BY d, x"
# Check the contents of the system.mutations table.
@ -65,9 +62,7 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_cleaner_r1(x) VALUES (1), (2
# Add some mutations and wait for their execution
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_cleaner_r1 DELETE WHERE x = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_cleaner_r1 DELETE WHERE x = 2"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_cleaner_r1 DELETE WHERE x = 3"
wait_for_mutation "mutations_cleaner_r2" "0000000002"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_cleaner_r1 DELETE WHERE x = 3 SETTINGS mutations_sync = 2"
# Add another mutation and prevent its execution on the second replica
${CLICKHOUSE_CLIENT} --query="SYSTEM STOP REPLICATION QUEUES mutations_cleaner_r2"