Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into pg-ch-replica

This commit is contained in:
kssenii 2021-06-27 19:56:34 +00:00
commit aa7133c239
34 changed files with 568 additions and 155 deletions

View File

@ -184,10 +184,27 @@ endif ()
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -rdynamic")
find_program (OBJCOPY_PATH NAMES "llvm-objcopy" "llvm-objcopy-12" "llvm-objcopy-11" "llvm-objcopy-10" "llvm-objcopy-9" "llvm-objcopy-8" "objcopy")
if (NOT OBJCOPY_PATH AND OS_DARWIN)
find_program (BREW_PATH NAMES "brew")
if (BREW_PATH)
execute_process (COMMAND ${BREW_PATH} --prefix llvm ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE LLVM_PREFIX)
if (LLVM_PREFIX)
find_program (OBJCOPY_PATH NAMES "llvm-objcopy" PATHS "${LLVM_PREFIX}/bin" NO_DEFAULT_PATH)
endif ()
if (NOT OBJCOPY_PATH)
execute_process (COMMAND ${BREW_PATH} --prefix binutils ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE BINUTILS_PREFIX)
if (BINUTILS_PREFIX)
find_program (OBJCOPY_PATH NAMES "objcopy" PATHS "${BINUTILS_PREFIX}/bin" NO_DEFAULT_PATH)
endif ()
endif ()
endif ()
endif ()
if (OBJCOPY_PATH)
message(STATUS "Using objcopy: ${OBJCOPY_PATH}.")
message (STATUS "Using objcopy: ${OBJCOPY_PATH}")
else ()
message(FATAL_ERROR "Cannot find objcopy.")
message (FATAL_ERROR "Cannot find objcopy.")
endif ()
if (OS_DARWIN)

View File

@ -33,51 +33,25 @@ macro(clickhouse_embed_binaries)
message(FATAL_ERROR "The list of binary resources to embed may not be empty")
endif()
# If cross-compiling, ensure we use the toolchain file and target the actual target architecture
if (CMAKE_CROSSCOMPILING)
set(CROSS_COMPILE_FLAGS --target=${CMAKE_C_COMPILER_TARGET})
# FIXME: find a way to properly pass all cross-compile flags to custom command in CMake
if (CMAKE_SYSTEM_NAME STREQUAL "Darwin")
list(APPEND CROSS_COMPILE_FLAGS -isysroot ${CMAKE_OSX_SYSROOT} -mmacosx-version-min=${CMAKE_OSX_DEPLOYMENT_TARGET})
else ()
list(APPEND CROSS_COMPILE_FLAGS -isysroot ${CMAKE_SYSROOT})
endif ()
else()
set(CROSS_COMPILE_FLAGS "")
endif()
add_library("${EMBED_TARGET}" STATIC)
set_target_properties("${EMBED_TARGET}" PROPERTIES LINKER_LANGUAGE C)
set(EMBED_TEMPLATE_FILE "${PROJECT_SOURCE_DIR}/programs/embed_binary.S.in")
set(RESOURCE_OBJS)
foreach(RESOURCE_FILE ${EMBED_RESOURCES})
set(RESOURCE_OBJ "${RESOURCE_FILE}.o")
list(APPEND RESOURCE_OBJS "${RESOURCE_OBJ}")
# Normalize the name of the resource
foreach(RESOURCE_FILE ${EMBED_RESOURCES})
set(ASSEMBLY_FILE_NAME "${RESOURCE_FILE}.S")
set(BINARY_FILE_NAME "${RESOURCE_FILE}")
# Normalize the name of the resource.
string(REGEX REPLACE "[\./-]" "_" SYMBOL_NAME "${RESOURCE_FILE}") # - must be last in regex
string(REPLACE "+" "_PLUS_" SYMBOL_NAME "${SYMBOL_NAME}")
set(ASSEMBLY_FILE_NAME "${RESOURCE_FILE}.S")
# Put the configured assembly file in the output directory.
# This is so we can clean it up as usual, and we CD to the
# source directory before compiling, so that the assembly
# `.incbin` directive can find the file.
# Generate the configured assembly file in the output directory.
configure_file("${EMBED_TEMPLATE_FILE}" "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" @ONLY)
# Generate the output object file by compiling the assembly, in the directory of
# the sources so that the resource file may also be found
add_custom_command(
OUTPUT ${RESOURCE_OBJ}
COMMAND cd "${EMBED_RESOURCE_DIR}" &&
${CMAKE_C_COMPILER} "${CROSS_COMPILE_FLAGS}" -c -o
"${CMAKE_CURRENT_BINARY_DIR}/${RESOURCE_OBJ}"
"${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}"
COMMAND_EXPAND_LISTS
)
set_source_files_properties("${RESOURCE_OBJ}" PROPERTIES EXTERNAL_OBJECT true GENERATED true)
endforeach()
# Set the include directory for relative paths specified for `.incbin` directive.
set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY INCLUDE_DIRECTORIES "${EMBED_RESOURCE_DIR}")
add_library("${EMBED_TARGET}" STATIC ${RESOURCE_OBJS})
set_target_properties("${EMBED_TARGET}" PROPERTIES LINKER_LANGUAGE C)
target_sources("${EMBED_TARGET}" PRIVATE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}")
endforeach()
endmacro()

View File

@ -561,7 +561,7 @@ if args.report == 'main':
# Don't show mildly unstable queries, only the very unstable ones we
# treat as errors.
if very_unstable_queries:
if very_unstable_queries > 3:
if very_unstable_queries > 5:
error_tests += very_unstable_queries
status = 'failure'
message_array.append(str(very_unstable_queries) + ' unstable')

View File

@ -33,7 +33,7 @@ Reboot.
``` bash
brew update
brew install cmake ninja libtool gettext llvm gcc
brew install cmake ninja libtool gettext llvm gcc binutils
```
## Checkout ClickHouse Sources {#checkout-clickhouse-sources}

View File

@ -1803,6 +1803,27 @@ Possible values:
Default value: 0.
## distributed_directory_monitor_split_batch_on_failure {#distributed_directory_monitor_split_batch_on_failure}
Enables/disables splitting batches on failures.
Sometimes sending particular batch to the remote shard may fail, because of some complex pipeline after (i.e. `MATERIALIZED VIEW` with `GROUP BY`) due to `Memory limit exceeded` or similar errors. In this case, retrying will not help (and this will stuck distributed sends for the table) but sending files from that batch one by one may succeed INSERT.
So installing this setting to `1` will disable batching for such batches (i.e. temporary disables `distributed_directory_monitor_batch_inserts` for failed batches).
Possible values:
- 1 — Enabled.
- 0 — Disabled.
Default value: 0.
!!! note "Note"
This setting also affects broken batches (that may appears because of abnormal server (machine) termination and no `fsync_after_insert`/`fsync_directories` for [Distributed](../../engines/table-engines/special/distributed.md) table engine).
!!! warning "Warning"
You should not rely on automatic batch splitting, since this may hurt performance.
## os_thread_priority {#setting-os-thread-priority}
Sets the priority ([nice](https://en.wikipedia.org/wiki/Nice_(Unix))) for threads that execute queries. The OS scheduler considers this priority when choosing the next thread to run on each available CPU core.
@ -3145,4 +3166,17 @@ SETTINGS index_granularity = 8192 │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
[Original article](https://clickhouse.tech/docs/en/operations/settings/settings/) <!-- hide -->
## external_table_functions_use_nulls {#external-table-functions-use-nulls}
Defines how [mysql](../../sql-reference/table-functions/mysql.md), [postgresql](../../sql-reference/table-functions/postgresql.md) and [odbc](../../sql-reference/table-functions/odbc.md)] table functions use Nullable columns.
Possible values:
- 0 — The table function explicitly uses Nullable columns.
- 1 — The table function implicitly uses Nullable columns.
Default value: `1`.
**Usage**
If the setting is set to `0`, the table function does not make Nullable columns and inserts default values instead of NULL. This is also applicable for NULL values inside arrays.

View File

@ -3023,4 +3023,17 @@ SETTINGS index_granularity = 8192 │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/operations/settings/settings/) <!--hide-->
## external_table_functions_use_nulls {#external-table-functions-use-nulls}
Определяет, как табличные функции [mysql](../../sql-reference/table-functions/mysql.md), [postgresql](../../sql-reference/table-functions/postgresql.md) и [odbc](../../sql-reference/table-functions/odbc.md)] используют Nullable столбцы.
Возможные значения:
- 0 — табличная функция явно использует Nullable столбцы.
- 1 — табличная функция неявно использует Nullable столбцы.
Значение по умолчанию: `1`.
**Использование**
Если установлено значение `0`, то табличная функция не делает Nullable столбцы, а вместо NULL выставляет значения по умолчанию для скалярного типа. Это также применимо для значений NULL внутри массивов.

View File

@ -1,4 +1,4 @@
# 设置 {#set}
# 集合 {#set}
始终存在于 RAM 中的数据集。它适用于IN运算符的右侧请参见 «IN运算符» 部分)。

View File

@ -555,6 +555,8 @@
M(585, CANNOT_PARSE_YAML) \
M(586, CANNOT_CREATE_FILE) \
M(587, CONCURRENT_ACCESS_NOT_SUPPORTED) \
M(588, DISTRIBUTED_BROKEN_BATCH_INFO) \
M(589, DISTRIBUTED_BROKEN_BATCH_FILES) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -90,6 +90,7 @@ class IColumn;
M(Milliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \
\
M(Bool, distributed_directory_monitor_batch_inserts, false, "Should StorageDistributed DirectoryMonitors try to batch individual inserts into bigger ones.", 0) \
M(Bool, distributed_directory_monitor_split_batch_on_failure, false, "Should StorageDistributed DirectoryMonitors try to split batch into smaller in case of failures.", 0) \
\
M(Bool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.", 0) \
\

View File

@ -25,6 +25,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
extern const int FILE_ALREADY_EXISTS;
extern const int INCORRECT_QUERY;
extern const int ABORTED;
}
class AtomicDatabaseTablesSnapshotIterator final : public DatabaseTablesSnapshotIterator
@ -418,7 +419,18 @@ void DatabaseAtomic::loadStoredObjects(ContextMutablePtr local_context, bool has
{
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
if (has_force_restore_data_flag)
fs::remove_all(path_to_table_symlinks);
{
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
{
if (!fs::is_symlink(table_path))
{
throw Exception(ErrorCodes::ABORTED,
"'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path()));
}
fs::remove(table_path);
}
}
DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);

View File

@ -40,14 +40,19 @@ std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx)
}
static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable = false, uint16_t dimensions = 0)
static DataTypePtr convertPostgreSQLDataType(String & type, const std::function<void()> & recheck_array, bool is_nullable = false, uint16_t dimensions = 0)
{
DataTypePtr res;
bool is_array = false;
/// Get rid of trailing '[]' for arrays
if (dimensions)
if (type.ends_with("[]"))
{
is_array = true;
while (type.ends_with("[]"))
type.resize(type.size() - 2);
}
if (type == "smallint")
res = std::make_shared<DataTypeInt16>();
@ -103,8 +108,24 @@ static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable = f
res = std::make_shared<DataTypeString>();
if (is_nullable)
res = std::make_shared<DataTypeNullable>(res);
while (dimensions--)
res = std::make_shared<DataTypeArray>(res);
if (is_array)
{
/// In some cases att_ndims does not return correct number of dimensions
/// (it might return incorrect 0 number, for example, when a postgres table is created via 'as select * from table_with_arrays').
/// So recheck all arrays separately afterwards. (Cannot check here on the same connection because another query is in execution).
if (!dimensions)
{
/// Return 1d array type and recheck all arrays dims with array_ndims
res = std::make_shared<DataTypeArray>(res);
recheck_array();
}
else
{
while (dimensions--)
res = std::make_shared<DataTypeArray>(res);
}
}
return res;
}
@ -114,34 +135,61 @@ template<typename T>
std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
T & tx, const String & postgres_table_name, const String & query, bool use_nulls, bool only_names_and_types)
{
auto columns = NamesAndTypesList();
auto columns = NamesAndTypes();
try
{
auto stream{pqxx::stream_from::query(tx, query)};
std::set<size_t> recheck_arrays_indexes;
{
auto stream{pqxx::stream_from::query(tx, query)};
if (only_names_and_types)
{
std::tuple<std::string, std::string> row;
while (stream >> row)
columns.push_back(NameAndTypePair(std::get<0>(row), convertPostgreSQLDataType(std::get<1>(row))));
}
else
{
std::tuple<std::string, std::string, std::string, uint16_t> row;
while (stream >> row)
size_t i = 0;
auto recheck_array = [&]() { recheck_arrays_indexes.insert(i); };
if (only_names_and_types)
{
columns.push_back(NameAndTypePair(
std::get<0>(row), /// column name
convertPostgreSQLDataType(
std::get<1>(row), /// data type
use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false == nullable
std::get<3>(row)))); /// number of dimensions if data type is array
std::tuple<std::string, std::string> row;
while (stream >> row)
{
columns.push_back(NameAndTypePair(std::get<0>(row), convertPostgreSQLDataType(std::get<1>(row), recheck_array)));
++i;
}
}
else
{
std::tuple<std::string, std::string, std::string, uint16_t> row;
while (stream >> row)
{
auto data_type = convertPostgreSQLDataType(std::get<1>(row),
recheck_array,
use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false, i.e. value is nullable
std::get<3>(row));
columns.push_back(NameAndTypePair(std::get<0>(row), data_type));
++i;
}
}
stream.complete();
}
stream.complete();
for (const auto & i : recheck_arrays_indexes)
{
const auto & name_and_type = columns[i];
/// All rows must contain the same number of dimensions, so limit 1 is ok. If number of dimensions in all rows is not the same -
/// such arrays are not able to be used as ClickHouse Array at all.
pqxx::result result{tx.exec(fmt::format("SELECT array_ndims({}) FROM {} LIMIT 1", name_and_type.name, postgres_table_name))};
auto dimensions = result[0][0].as<int>();
/// It is always 1d array if it is in recheck.
DataTypePtr type = assert_cast<const DataTypeArray *>(name_and_type.type.get())->getNestedType();
while (dimensions--)
type = std::make_shared<DataTypeArray>(type);
columns[i] = NameAndTypePair(name_and_type.name, type);
}
}
catch (const pqxx::undefined_table &)
{
throw Exception(ErrorCodes::UNKNOWN_TABLE, "PostgreSQL table {} does not exist", postgres_table_name);
@ -152,7 +200,7 @@ std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
throw;
}
return !columns.empty() ? std::make_shared<NamesAndTypesList>(columns) : nullptr;
return !columns.empty() ? std::make_shared<NamesAndTypesList>(columns.begin(), columns.end()) : nullptr;
}

View File

@ -394,7 +394,7 @@ struct ContextSharedPart
/// Clusters for distributed tables
/// Initialized on demand (on distributed storages initialization) since Settings should be initialized
std::unique_ptr<Clusters> clusters;
std::shared_ptr<Clusters> clusters;
ConfigurationPtr clusters_config; /// Stores updated configs
mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
@ -1882,7 +1882,7 @@ std::optional<UInt16> Context::getTCPPortSecure() const
std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) const
{
auto res = getClusters().getCluster(cluster_name);
auto res = getClusters()->getCluster(cluster_name);
if (res)
return res;
@ -1896,7 +1896,7 @@ std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) c
std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name) const
{
return getClusters().getCluster(cluster_name);
return getClusters()->getCluster(cluster_name);
}
@ -1911,7 +1911,7 @@ void Context::reloadClusterConfig() const
}
const auto & config = cluster_config ? *cluster_config : getConfigRef();
auto new_clusters = std::make_unique<Clusters>(config, settings);
auto new_clusters = std::make_shared<Clusters>(config, settings);
{
std::lock_guard lock(shared->clusters_mutex);
@ -1927,16 +1927,16 @@ void Context::reloadClusterConfig() const
}
Clusters & Context::getClusters() const
std::shared_ptr<Clusters> Context::getClusters() const
{
std::lock_guard lock(shared->clusters_mutex);
if (!shared->clusters)
{
const auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef();
shared->clusters = std::make_unique<Clusters>(config, settings);
shared->clusters = std::make_shared<Clusters>(config, settings);
}
return *shared->clusters;
return shared->clusters;
}

View File

@ -679,7 +679,7 @@ public:
void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);
DDLWorker & getDDLWorker() const;
Clusters & getClusters() const;
std::shared_ptr<Clusters> getClusters() const;
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
void setClustersConfig(const ConfigurationPtr & config, const String & config_name = "remote_servers");

View File

@ -52,6 +52,15 @@ namespace ErrorCodes
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int EMPTY_DATA_PASSED;
extern const int INCORRECT_FILE_NAME;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int DISTRIBUTED_BROKEN_BATCH_INFO;
extern const int DISTRIBUTED_BROKEN_BATCH_FILES;
extern const int TOO_MANY_PARTS;
extern const int TOO_MANY_BYTES;
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int TOO_MANY_PARTITIONS;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
}
@ -227,9 +236,27 @@ namespace
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|| code == ErrorCodes::UNKNOWN_CODEC
|| code == ErrorCodes::CANNOT_DECOMPRESS
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES
|| (!remote_error && code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
/// Can the batch be split and send files from batch one-by-one instead?
bool isSplittableErrorCode(int code, bool remote)
{
return code == ErrorCodes::MEMORY_LIMIT_EXCEEDED
/// FunctionRange::max_elements and similar
|| code == ErrorCodes::ARGUMENT_OUT_OF_BOUND
|| code == ErrorCodes::TOO_MANY_PARTS
|| code == ErrorCodes::TOO_MANY_BYTES
|| code == ErrorCodes::TOO_MANY_ROWS_OR_BYTES
|| code == ErrorCodes::TOO_MANY_PARTITIONS
|| code == ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO
|| isFileBrokenErrorCode(code, remote)
;
}
SyncGuardPtr getDirectorySyncGuard(bool dir_fsync, const DiskPtr & disk, const String & path)
{
if (dir_fsync)
@ -319,6 +346,7 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, relative_path(relative_path_)
, path(fs::path(disk->getPath()) / relative_path / "")
, should_batch_inserts(storage.getContext()->getSettingsRef().distributed_directory_monitor_batch_inserts)
, split_batch_on_failure(storage.getContext()->getSettingsRef().distributed_directory_monitor_split_batch_on_failure)
, dir_fsync(storage.getDistributedSettingsRef().fsync_directories)
, min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes)
@ -642,6 +670,7 @@ struct StorageDistributedDirectoryMonitor::Batch
StorageDistributedDirectoryMonitor & parent;
const std::map<UInt64, String> & file_index_to_path;
bool split_batch_on_failure = true;
bool fsync = false;
bool dir_fsync = false;
@ -650,6 +679,7 @@ struct StorageDistributedDirectoryMonitor::Batch
const std::map<UInt64, String> & file_index_to_path_)
: parent(parent_)
, file_index_to_path(file_index_to_path_)
, split_batch_on_failure(parent.split_batch_on_failure)
, fsync(parent.storage.getDistributedSettingsRef().fsync_after_insert)
, dir_fsync(parent.dir_fsync)
{}
@ -703,37 +733,23 @@ struct StorageDistributedDirectoryMonitor::Batch
auto connection = parent.pool->get(timeouts);
bool batch_broken = false;
bool batch_marked_as_broken = false;
try
{
std::unique_ptr<RemoteBlockOutputStream> remote;
for (UInt64 file_idx : file_indices)
try
{
auto file_path = file_index_to_path.find(file_idx);
if (file_path == file_index_to_path.end())
{
LOG_ERROR(parent.log, "Failed to send batch: file with index {} is absent", file_idx);
batch_broken = true;
break;
}
ReadBufferFromFile in(file_path->second);
const auto & distributed_header = readDistributedHeader(in, parent.log);
if (!remote)
{
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
distributed_header.client_info);
remote->writePrefix();
}
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log);
sendBatch(*connection, timeouts);
}
catch (const Exception & e)
{
if (split_batch_on_failure && isSplittableErrorCode(e.code(), e.isRemoteException()))
{
tryLogCurrentException(parent.log, "Trying to split batch due to");
sendSeparateFiles(*connection, timeouts);
}
else
throw;
}
if (remote)
remote->writeSuffix();
}
catch (Exception & e)
{
@ -741,6 +757,8 @@ struct StorageDistributedDirectoryMonitor::Batch
{
tryLogCurrentException(parent.log, "Failed to send batch due to");
batch_broken = true;
if (!e.isRemoteException() && e.code() == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES)
batch_marked_as_broken = true;
}
else
{
@ -761,7 +779,7 @@ struct StorageDistributedDirectoryMonitor::Batch
for (UInt64 file_index : file_indices)
parent.markAsSend(file_index_to_path.at(file_index));
}
else
else if (!batch_marked_as_broken)
{
LOG_ERROR(parent.log, "Marking a batch of {} files as broken.", file_indices.size());
@ -797,6 +815,78 @@ struct StorageDistributedDirectoryMonitor::Batch
}
recovered = true;
}
private:
void sendBatch(Connection & connection, const ConnectionTimeouts & timeouts)
{
std::unique_ptr<RemoteBlockOutputStream> remote;
for (UInt64 file_idx : file_indices)
{
auto file_path = file_index_to_path.find(file_idx);
if (file_path == file_index_to_path.end())
throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO,
"Failed to send batch: file with index {} is absent", file_idx);
ReadBufferFromFile in(file_path->second);
const auto & distributed_header = readDistributedHeader(in, parent.log);
if (!remote)
{
remote = std::make_unique<RemoteBlockOutputStream>(connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
distributed_header.client_info);
remote->writePrefix();
}
bool compression_expected = connection.getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log);
}
if (remote)
remote->writeSuffix();
}
void sendSeparateFiles(Connection & connection, const ConnectionTimeouts & timeouts)
{
size_t broken_files = 0;
for (UInt64 file_idx : file_indices)
{
auto file_path = file_index_to_path.find(file_idx);
if (file_path == file_index_to_path.end())
{
LOG_ERROR(parent.log, "Failed to send one file from batch: file with index {} is absent", file_idx);
++broken_files;
continue;
}
try
{
ReadBufferFromFile in(file_path->second);
const auto & distributed_header = readDistributedHeader(in, parent.log);
RemoteBlockOutputStream remote(connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
distributed_header.client_info);
remote.writePrefix();
bool compression_expected = connection.getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, remote, compression_expected, in, parent.log);
remote.writeSuffix();
}
catch (Exception & e)
{
e.addMessage(fmt::format("While sending {}", file_path->second));
parent.maybeMarkAsBroken(file_path->second, e);
++broken_files;
}
}
if (broken_files)
throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES,
"Failed to send {} files", broken_files);
}
};
class DirectoryMonitorBlockInputStream : public IBlockInputStream

View File

@ -92,6 +92,7 @@ private:
std::string path;
const bool should_batch_inserts = false;
const bool split_batch_on_failure = true;
const bool dir_fsync = false;
const size_t min_batched_block_size_rows = 0;
const size_t min_batched_block_size_bytes = 0;

View File

@ -752,7 +752,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
for (const auto & dir_name : dir_names)
{
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false);
directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds());
}
}

View File

@ -49,7 +49,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW;
@ -82,9 +81,8 @@ static StorageID extractDependentTable(ASTPtr & query, ContextPtr context, const
{
auto * ast_select = subquery->as<ASTSelectWithUnionQuery>();
if (!ast_select)
throw Exception("Logical error while creating StorageLiveView."
" Could not retrieve table name from select query.",
DB::ErrorCodes::LOGICAL_ERROR);
throw Exception("LIVE VIEWs are only supported for queries from tables, but there is no table name in select query.",
DB::ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
if (ast_select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);

View File

@ -800,12 +800,33 @@ void StorageDistributed::startup()
if (!storage_policy)
return;
for (const DiskPtr & disk : data_volume->getDisks())
createDirectoryMonitors(disk);
const auto & disks = data_volume->getDisks();
for (const String & path : getDataPaths())
/// Make initialization for large number of disks parallel.
ThreadPool pool(disks.size());
for (const DiskPtr & disk : disks)
{
pool.scheduleOrThrowOnError([&]()
{
createDirectoryMonitors(disk);
});
}
pool.wait();
const auto & paths = getDataPaths();
std::vector<UInt64> last_increment(paths.size());
for (size_t i = 0; i < paths.size(); ++i)
{
pool.scheduleOrThrowOnError([&, i]()
{
last_increment[i] = getMaximumFileNumber(paths[i]);
});
}
pool.wait();
for (const auto inc : last_increment)
{
UInt64 inc = getMaximumFileNumber(path);
if (inc > file_names_increment.value)
file_names_increment.value.store(inc);
}
@ -907,30 +928,50 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk)
}
else
{
requireDirectoryMonitor(disk, dir_path.filename().string());
requireDirectoryMonitor(disk, dir_path.filename().string(), /* startup= */ true);
}
}
}
}
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name)
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup)
{
const std::string & disk_path = disk->getPath();
const std::string key(disk_path + name);
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_monitor)
auto create_node_data = [&]()
{
node_data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
ClusterNodeData data;
data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
*this, disk, relative_data_path + name,
node_data.connection_pool,
data.connection_pool,
monitors_blocker,
getContext()->getDistributedSchedulePool());
return data;
};
/// In case of startup the lock can be acquired later.
if (startup)
{
auto tmp_node_data = create_node_data();
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
assert(!node_data.directory_monitor);
node_data = std::move(tmp_node_data);
return *node_data.directory_monitor;
}
else
{
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_monitor)
{
node_data = create_node_data();
}
return *node_data.directory_monitor;
}
return *node_data.directory_monitor;
}
std::vector<StorageDistributedDirectoryMonitor::Status> StorageDistributed::getDirectoryMonitorsStatuses() const

View File

@ -160,7 +160,7 @@ private:
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors(const DiskPtr & disk);
/// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name);
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup);
/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)

View File

@ -31,7 +31,7 @@ NamesAndTypesList StorageSystemClusters::getNamesAndTypes()
void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
for (const auto & name_and_cluster : context->getClusters().getContainer())
for (const auto & name_and_cluster : context->getClusters()->getContainer())
writeCluster(res_columns, name_and_cluster);
const auto databases = DatabaseCatalog::instance().getDatabases();

View File

@ -130,8 +130,8 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, Context
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
zk_exception_code = code;
const auto & clusters = context->getClusters();
for (const auto & name_and_cluster : clusters.getContainer())
const auto clusters = context->getClusters();
for (const auto & name_and_cluster : clusters->getContainer())
{
const ClusterPtr & cluster = name_and_cluster.second;
const auto & shards_info = cluster->getShardsInfo();

View File

@ -0,0 +1,15 @@
<yandex>
<profiles>
<default>
<!-- always send via network -->
<prefer_localhost_replica>0</prefer_localhost_replica>
<!-- enable batching to check splitting -->
<distributed_directory_monitor_batch_inserts>1</distributed_directory_monitor_batch_inserts>
<!-- override defaults just in case they will be changed -->
<distributed_directory_monitor_split_batch_on_failure>1</distributed_directory_monitor_split_batch_on_failure>
<!-- wait for explicit flush -->
<distributed_directory_monitor_sleep_time_ms>86400</distributed_directory_monitor_sleep_time_ms>
<distributed_directory_monitor_max_sleep_time_ms>86400</distributed_directory_monitor_max_sleep_time_ms>
</default>
</profiles>
</yandex>

View File

@ -0,0 +1,15 @@
<yandex>
<profiles>
<default>
<!-- always send via network -->
<prefer_localhost_replica>0</prefer_localhost_replica>
<!-- enable batching to check splitting -->
<distributed_directory_monitor_batch_inserts>1</distributed_directory_monitor_batch_inserts>
<!-- disable -->
<distributed_directory_monitor_split_batch_on_failure>0</distributed_directory_monitor_split_batch_on_failure>
<!-- wait for explicit flush -->
<distributed_directory_monitor_sleep_time_ms>86400</distributed_directory_monitor_sleep_time_ms>
<distributed_directory_monitor_max_sleep_time_ms>86400</distributed_directory_monitor_max_sleep_time_ms>
</default>
</profiles>
</yandex>

View File

@ -0,0 +1,18 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,60 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
# node1 -- distributed_directory_monitor_split_batch_on_failure=on
node1 = cluster.add_instance('node1',
main_configs=['configs/remote_servers.xml'],
user_configs=['configs/overrides_1.xml'],
)
# node2 -- distributed_directory_monitor_split_batch_on_failure=off
node2 = cluster.add_instance('node2',
main_configs=['configs/remote_servers.xml'],
user_configs=['configs/overrides_2.xml'],
)
@pytest.fixture(scope='module')
def started_cluster():
try:
cluster.start()
for _, node in cluster.instances.items():
node.query("""
create table null_ (key Int, value Int) engine=Null();
create table dist as null_ engine=Distributed(test_cluster, currentDatabase(), null_, key);
create table data (key Int, uniq_values Int) engine=Memory();
create materialized view mv to data as select key, uniqExact(value) uniq_values from null_ group by key;
system stop distributed sends dist;
create table dist_data as data engine=Distributed(test_cluster, currentDatabase(), data);
""")
yield cluster
finally:
cluster.shutdown()
def test_distributed_directory_monitor_split_batch_on_failure_OFF(started_cluster):
for i in range(0, 100):
limit = 100e3
node2.query(f'insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}', settings={
# max_memory_usage is the limit for the batch on the remote node
# (local query should not be affected since 30MB is enough for 100K rows)
'max_memory_usage': '30Mi',
})
# "Received from" is mandatory, since the exception should be thrown on the remote node.
with pytest.raises(QueryRuntimeException, match=r'DB::Exception: Received from.*Memory limit \(for query\) exceeded: .*while pushing to view default\.mv'):
node2.query('system flush distributed dist')
assert int(node2.query('select count() from dist_data')) == 0
def test_distributed_directory_monitor_split_batch_on_failure_ON(started_cluster):
for i in range(0, 100):
limit = 100e3
node1.query(f'insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}', settings={
# max_memory_usage is the limit for the batch on the remote node
# (local query should not be affected since 30MB is enough for 100K rows)
'max_memory_usage': '30Mi',
})
node1.query('system flush distributed dist')
assert int(node1.query('select count() from dist_data')) == 100000

View File

@ -0,0 +1,32 @@
<yandex>
<remote_servers>
<insert_distributed_async_send_cluster_two_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</insert_distributed_async_send_cluster_two_replicas>
<insert_distributed_async_send_cluster_two_shards>
<shard>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</insert_distributed_async_send_cluster_two_shards>
</remote_servers>
</yandex>

View File

@ -0,0 +1,7 @@
<yandex>
<profiles>
<default>
<distributed_directory_monitor_split_batch_on_failure>1</distributed_directory_monitor_split_batch_on_failure>
</default>
</profiles>
</yandex>

View File

@ -17,11 +17,29 @@ n1 = cluster.add_instance('n1', main_configs=['configs/remote_servers.xml'], use
# n2 -- distributed_directory_monitor_batch_inserts=0
n2 = cluster.add_instance('n2', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.d/no_batch.xml'])
# n3 -- distributed_directory_monitor_batch_inserts=1/distributed_directory_monitor_split_batch_on_failure=1
n3 = cluster.add_instance('n3', main_configs=['configs/remote_servers_split.xml'], user_configs=[
'configs/users.d/batch.xml',
'configs/users.d/split.xml',
])
# n4 -- distributed_directory_monitor_batch_inserts=0/distributed_directory_monitor_split_batch_on_failure=1
n4 = cluster.add_instance('n4', main_configs=['configs/remote_servers_split.xml'], user_configs=[
'configs/users.d/no_batch.xml',
'configs/users.d/split.xml',
])
batch_params = pytest.mark.parametrize('batch', [
(1),
(0),
])
batch_and_split_params = pytest.mark.parametrize('batch,split', [
(1, 0),
(0, 0),
(1, 1),
(0, 1),
])
@pytest.fixture(scope='module', autouse=True)
def start_cluster():
try:
@ -62,15 +80,19 @@ def insert_data(node):
assert size > 1<<16
return size
def get_node(batch):
def get_node(batch, split=None):
if split:
if batch:
return n3
return n4
if batch:
return n1
return n2
def bootstrap(batch):
def bootstrap(batch, split=None):
drop_tables()
create_tables('insert_distributed_async_send_cluster_two_replicas')
return insert_data(get_node(batch))
return insert_data(get_node(batch, split))
def get_path_to_dist_batch(file='2.bin'):
# There are:
@ -80,8 +102,8 @@ def get_path_to_dist_batch(file='2.bin'):
# @return the file for the n2 shard
return f'/var/lib/clickhouse/data/default/dist/shard1_replica2/{file}'
def check_dist_after_corruption(truncate, batch):
node = get_node(batch)
def check_dist_after_corruption(truncate, batch, split=None):
node = get_node(batch, split)
if batch:
# In batch mode errors are ignored
@ -102,8 +124,12 @@ def check_dist_after_corruption(truncate, batch):
broken = get_path_to_dist_batch('broken')
node.exec_in_container(['bash', '-c', f'ls {broken}/2.bin'])
assert int(n1.query('SELECT count() FROM data')) == 10000
assert int(n2.query('SELECT count() FROM data')) == 0
if split:
assert int(n3.query('SELECT count() FROM data')) == 10000
assert int(n4.query('SELECT count() FROM data')) == 0
else:
assert int(n1.query('SELECT count() FROM data')) == 10000
assert int(n2.query('SELECT count() FROM data')) == 0
@batch_params
@ -114,17 +140,17 @@ def test_insert_distributed_async_send_success(batch):
assert int(n1.query('SELECT count() FROM data')) == 10000
assert int(n2.query('SELECT count() FROM data')) == 10000
@batch_params
def test_insert_distributed_async_send_truncated_1(batch):
size = bootstrap(batch)
@batch_and_split_params
def test_insert_distributed_async_send_truncated_1(batch, split):
size = bootstrap(batch, split)
path = get_path_to_dist_batch()
node = get_node(batch)
node = get_node(batch, split)
new_size = size - 10
# we cannot use truncate, due to hardlinks
node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c {new_size} /tmp/bin > {path}'])
check_dist_after_corruption(True, batch)
check_dist_after_corruption(True, batch, split)
@batch_params
def test_insert_distributed_async_send_truncated_2(batch):

View File

@ -307,7 +307,7 @@ def test_postgres_distributed(started_cluster):
started_cluster.unpause_container('postgres1')
assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n')
def test_datetime_with_timezone(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
@ -323,6 +323,22 @@ def test_datetime_with_timezone(started_cluster):
assert(node1.query("select * from test_timezone") == "2014-04-04 20:00:00\t2014-04-04 16:00:00\n")
def test_postgres_ndim(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor.execute('CREATE TABLE arr1 (a Integer[])')
cursor.execute("INSERT INTO arr1 SELECT '{{1}, {2}}'")
# The point is in creating a table via 'as select *', in postgres att_ndim will not be correct in this case.
cursor.execute('CREATE TABLE arr2 AS SELECT * FROM arr1')
cursor.execute("SELECT attndims AS dims FROM pg_attribute WHERE attrelid = 'arr2'::regclass; ")
result = cursor.fetchall()[0]
assert(int(result[0]) == 0)
result = node1.query('''SELECT toTypeName(a) FROM postgresql('postgres1:5432', 'clickhouse', 'arr2', 'postgres', 'mysecretpassword')''')
assert(result.strip() == "Array(Array(Nullable(Int32)))")
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -1,13 +0,0 @@
<test>
<preconditions>
<table_exists>trips_mergetree</table_exists>
</preconditions>
<query>SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type</query>
<query>SELECT passenger_count, avg(total_amount) FROM trips_mergetree GROUP BY passenger_count</query>
<query>SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year</query>
<query>SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC</query>
</test>

View File

@ -1,6 +1,7 @@
<test>
<preconditions>
<table_exists>hits_100m_single</table_exists>
<table_exists>hits_10m_single</table_exists>
<ram_size>30000000000</ram_size>
</preconditions>
@ -58,4 +59,5 @@
</substitutions>
<query>SELECT {key} AS k, {func}(UserID) FROM hits_100m_single GROUP BY k FORMAT Null</query>
<query>SELECT {key} AS k, uniqTheta(UserID) FROM hits_10m_single GROUP BY k FORMAT Null</query>
</test>

View File

@ -249,3 +249,5 @@
01824_prefer_global_in_and_join
01576_alias_column_rewrite
01924_argmax_bitmap_state
01914_exchange_dictionaries
01923_different_expression_name_alias

View File

@ -112,7 +112,8 @@
"00738_lock_for_inner_table",
"01153_attach_mv_uuid",
/// Sometimes cannot lock file most likely due to concurrent or adjacent tests, but we don't care how it works in Ordinary database.
"rocksdb"
"rocksdb",
"01914_exchange_dictionaries" /// Requires Atomic database
],
"database-replicated": [
/// Unclassified
@ -519,7 +520,8 @@
"01924_argmax_bitmap_state",
"01913_replace_dictionary",
"01914_exchange_dictionaries",
"01915_create_or_replace_dictionary"
"01915_create_or_replace_dictionary",
"01913_names_of_tuple_literal"
],
"parallel":
[