Merge remote-tracking branch 'origin/master' into use-iobject-storage-for-table-engines-1

This commit is contained in:
kssenii 2024-04-28 12:19:32 +02:00
commit 7a416f2e76
102 changed files with 753 additions and 363 deletions

View File

@ -448,6 +448,14 @@ jobs:
test_name: Stateless tests (debug)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatelessTestAsanAzure:
needs: [RunConfig, BuilderDebAsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateless tests (azure, asan)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
##############################################################################################
############################ FUNCTIONAl STATEFUL TESTS #######################################
##############################################################################################
@ -598,6 +606,14 @@ jobs:
test_name: Stress test (tsan)
runner_type: stress-tester
data: ${{ needs.RunConfig.outputs.data }}
StressTestTsanAzure:
needs: [RunConfig, BuilderDebTsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stress test (azure, tsan)
runner_type: stress-tester
data: ${{ needs.RunConfig.outputs.data }}
StressTestMsan:
needs: [RunConfig, BuilderDebMsan]
if: ${{ !failure() && !cancelled() }}

View File

@ -16,7 +16,7 @@
#ci_set_reduced
#ci_set_arm
#ci_set_integration
#ci_set_analyzer
#ci_set_old_analyzer
## To run specified job in CI:
#job_<JOB NAME>

View File

@ -8,9 +8,6 @@ option (SANITIZE "Enable one of the code sanitizers" "")
set (SAN_FLAGS "${SAN_FLAGS} -g -fno-omit-frame-pointer -DSANITIZER")
# It's possible to pass an ignore list to sanitizers (-fsanitize-ignorelist). Intentionally not doing this because
# 1. out-of-source suppressions are awkward 2. it seems ignore lists don't work after the Clang v16 upgrade (#49829)
if (SANITIZE)
if (SANITIZE STREQUAL "address")
set (ASAN_FLAGS "-fsanitize=address -fsanitize-address-use-after-scope")

2
contrib/curl vendored

@ -1 +1 @@
Subproject commit 1a05e833f8f7140628b27882b10525fd9ec4b873
Subproject commit de7b3e89218467159a7af72d58cea8425946e97d

View File

@ -33,14 +33,15 @@ set (SRCS
"${LIBRARY_DIR}/lib/curl_memrchr.c"
"${LIBRARY_DIR}/lib/curl_multibyte.c"
"${LIBRARY_DIR}/lib/curl_ntlm_core.c"
"${LIBRARY_DIR}/lib/curl_ntlm_wb.c"
"${LIBRARY_DIR}/lib/curl_path.c"
"${LIBRARY_DIR}/lib/curl_range.c"
"${LIBRARY_DIR}/lib/curl_rtmp.c"
"${LIBRARY_DIR}/lib/curl_sasl.c"
"${LIBRARY_DIR}/lib/curl_sha512_256.c"
"${LIBRARY_DIR}/lib/curl_sspi.c"
"${LIBRARY_DIR}/lib/curl_threads.c"
"${LIBRARY_DIR}/lib/curl_trc.c"
"${LIBRARY_DIR}/lib/cw-out.c"
"${LIBRARY_DIR}/lib/dict.c"
"${LIBRARY_DIR}/lib/doh.c"
"${LIBRARY_DIR}/lib/dynbuf.c"
@ -98,6 +99,7 @@ set (SRCS
"${LIBRARY_DIR}/lib/psl.c"
"${LIBRARY_DIR}/lib/rand.c"
"${LIBRARY_DIR}/lib/rename.c"
"${LIBRARY_DIR}/lib/request.c"
"${LIBRARY_DIR}/lib/rtsp.c"
"${LIBRARY_DIR}/lib/select.c"
"${LIBRARY_DIR}/lib/sendf.c"

View File

@ -38,6 +38,7 @@
#define HAVE_ARPA_INET_H
#define HAVE_ERRNO_H
#define HAVE_GETSOCKNAME
#define HAVE_FCNTL_H
#define HAVE_NETDB_H
#define HAVE_NETINET_IN_H

View File

@ -19,7 +19,7 @@ ln -s /usr/share/clickhouse-test/clickhouse-test /usr/bin/clickhouse-test
# install test configs
/usr/share/clickhouse-test/config/install.sh
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log &
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --silent --inMemoryPersistence &
./setup_minio.sh stateful
config_logs_export_cluster /etc/clickhouse-server/config.d/system_logs_export.yaml
@ -87,7 +87,7 @@ function start()
tail -n1000 /var/log/clickhouse-server/clickhouse-server.log
break
fi
timeout 120 service clickhouse-server start
timeout 120 sudo -E -u clickhouse /usr/bin/clickhouse-server --config /etc/clickhouse-server/config.xml --daemon --pid-file /var/run/clickhouse-server/clickhouse-server.pid
sleep 0.5
counter=$((counter + 1))
done

View File

@ -42,12 +42,6 @@ source /utils.lib
# install test configs
/usr/share/clickhouse-test/config/install.sh
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
echo "Azure is disabled"
else
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log &
fi
./setup_minio.sh stateless
./setup_hdfs_minicluster.sh
@ -97,12 +91,11 @@ if [ "$NUM_TRIES" -gt "1" ]; then
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US_MAX=10000
mkdir -p /var/run/clickhouse-server
# simplest way to forward env variables to server
sudo -E -u clickhouse /usr/bin/clickhouse-server --config /etc/clickhouse-server/config.xml --daemon --pid-file /var/run/clickhouse-server/clickhouse-server.pid
else
sudo clickhouse start
fi
# simplest way to forward env variables to server
sudo -E -u clickhouse /usr/bin/clickhouse-server --config /etc/clickhouse-server/config.xml --daemon --pid-file /var/run/clickhouse-server/clickhouse-server.pid
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
sudo sed -i "s|<filesystem_caches_path>/var/lib/clickhouse/filesystem_caches/</filesystem_caches_path>|<filesystem_caches_path>/var/lib/clickhouse/filesystem_caches_1/</filesystem_caches_path>|" /etc/clickhouse-server1/config.d/filesystem_caches_path.xml
@ -212,6 +205,14 @@ function run_tests()
ADDITIONAL_OPTIONS+=('--s3-storage')
fi
if [[ -n "$USE_AZURE_STORAGE_FOR_MERGE_TREE" ]] && [[ "$USE_AZURE_STORAGE_FOR_MERGE_TREE" -eq 1 ]]; then
# to disable the same tests
ADDITIONAL_OPTIONS+=('--s3-storage')
# azurite is slow, but with these two settings it can be super slow
ADDITIONAL_OPTIONS+=('--no-random-settings')
ADDITIONAL_OPTIONS+=('--no-random-merge-tree-settings')
fi
if [[ -n "$USE_SHARED_CATALOG" ]] && [[ "$USE_SHARED_CATALOG" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--shared-catalog')
fi
@ -286,7 +287,7 @@ stop_logs_replication
failed_to_save_logs=0
for table in query_log zookeeper_log trace_log transactions_info_log metric_log
do
err=$( { clickhouse-client -q "select * from system.$table format TSVWithNamesAndTypes" | zstd --threads=0 > /test_output/$table.tsv.zst; } 2>&1 )
err=$(clickhouse-client -q "select * from system.$table into outfile '/test_output/$table.tsv.gz' format TSVWithNamesAndTypes")
echo "$err"
[[ "0" != "${#err}" ]] && failed_to_save_logs=1
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then

View File

@ -279,7 +279,7 @@ function check_logs_for_critical_errors()
function collect_query_and_trace_logs()
{
for table in query_log trace_log
for table in query_log trace_log metric_log
do
clickhouse-local --config-file=/etc/clickhouse-server/config.xml --only-system-tables -q "select * from system.$table format TSVWithNamesAndTypes" | zstd --threads=0 > /test_output/$table.tsv.zst ||:
done

View File

@ -52,7 +52,6 @@ export ZOOKEEPER_FAULT_INJECTION=1
# available for dump via clickhouse-local
configure
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log &
./setup_minio.sh stateless # to have a proper environment
config_logs_export_cluster /etc/clickhouse-server/config.d/system_logs_export.yaml

View File

@ -4,7 +4,7 @@ sidebar_position: 30
sidebar_label: Replicated
---
# [experimental] Replicated
# Replicated
The engine is based on the [Atomic](../../engines/database-engines/atomic.md) engine. It supports replication of metadata via DDL log being written to ZooKeeper and executed on all of the replicas for a given database.

View File

@ -8,6 +8,8 @@ sidebar_label: HDFS
This engine provides integration with the [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Hadoop) ecosystem by allowing to manage data on [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) via ClickHouse. This engine is similar to the [File](../../../engines/table-engines/special/file.md#table_engines-file) and [URL](../../../engines/table-engines/special/url.md#table_engines-url) engines, but provides Hadoop-specific features.
This feature is not supported by ClickHouse engineers, and it is known to have a sketchy quality. In case of any problems, fix them yourself and submit a pull request.
## Usage {#usage}
``` sql

View File

@ -327,7 +327,9 @@ Use buffering to avoid situations where a query processing error occurred after
## Setting a role with query parameters {#setting-role-with-query-parameters}
In certain scenarios, it might be required to set the granted role first, before executing the statement itself.
This is a new feature added in ClickHouse 24.4.
In specific scenarios, setting the granted role first might be required before executing the statement itself.
However, it is not possible to send `SET ROLE` and the statement together, as multi-statements are not allowed:
```
@ -346,7 +348,7 @@ To overcome this limitation, you could use the `role` query parameter instead:
curl -sS "http://localhost:8123?role=my_role" --data-binary "SELECT * FROM my_table;"
```
This will be an equivalent of executing `SET ROLE my_role` before the statement.
This will be the equivalent of executing `SET ROLE my_role` before the statement.
Additionally, it is possible to specify multiple `role` query parameters:

View File

@ -7,6 +7,8 @@ toc_max_heading_level: 2
# Core Settings
All below settings are also available in table [system.settings](/docs/en/operations/system-tables/settings).
## additional_table_filters
An additional filter expression that is applied after reading
@ -3931,19 +3933,6 @@ For example, `avg(if(cond, col, null))` can be rewritten to `avgOrNullIf(cond, c
Supported only with experimental analyzer (`allow_experimental_analyzer = 1`).
:::
## allow_experimental_database_replicated {#allow_experimental_database_replicated}
Enables to create databases with [Replicated](../../engines/database-engines/replicated.md) engine.
Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: `0`.
Cloud default value: `1`.
## database_replicated_initial_query_timeout_sec {#database_replicated_initial_query_timeout_sec}
Sets how long initial DDL query should wait for Replicated database to process previous DDL queue entries in seconds.

View File

@ -3447,17 +3447,6 @@ SELECT
FROM fuse_tbl
```
## allow_experimental_database_replicated {#allow_experimental_database_replicated}
Позволяет создавать базы данных с движком [Replicated](../../engines/database-engines/replicated.md).
Возможные значения:
- 0 — Disabled.
- 1 — Enabled.
Значение по умолчанию: `0`.
## database_replicated_initial_query_timeout_sec {#database_replicated_initial_query_timeout_sec}
Устанавливает, как долго начальный DDL-запрос должен ждать, пока реплицированная база данных прецессирует предыдущие записи очереди DDL в секундах.

View File

@ -918,11 +918,13 @@ bool Client::processWithFuzzing(const String & full_query)
}
void Client::printHelpMessage(const OptionsDescription & options_description)
void Client::printHelpMessage(const OptionsDescription & options_description, bool verbose)
{
std::cout << options_description.main_description.value() << "\n";
std::cout << options_description.external_description.value() << "\n";
std::cout << options_description.hosts_and_ports_description.value() << "\n";
if (verbose)
std::cout << "All settings are documented at https://clickhouse.com/docs/en/operations/settings/settings.\n\n";
std::cout << "In addition, --param_name=value can be specified for substitution of parameters for parametrized queries.\n";
std::cout << "\nSee also: https://clickhouse.com/docs/en/integrations/sql-clients/cli\n";
}

View File

@ -25,7 +25,7 @@ protected:
String getName() const override { return "client"; }
void printHelpMessage(const OptionsDescription & options_description) override;
void printHelpMessage(const OptionsDescription & options_description, bool verbose) override;
void addOptions(OptionsDescription & options_description) override;

View File

@ -774,10 +774,12 @@ void LocalServer::processConfig()
}
void LocalServer::printHelpMessage([[maybe_unused]] const OptionsDescription & options_description)
void LocalServer::printHelpMessage(const OptionsDescription & options_description, bool verbose)
{
std::cout << getHelpHeader() << "\n";
std::cout << options_description.main_description.value() << "\n";
if (verbose)
std::cout << "All settings are documented at https://clickhouse.com/docs/en/operations/settings/settings.\n\n";
std::cout << getHelpFooter() << "\n";
std::cout << "In addition, --param_name=value can be specified for substitution of parameters for parametrized queries.\n";
std::cout << "\nSee also: https://clickhouse.com/docs/en/operations/utilities/clickhouse-local/\n";

View File

@ -36,7 +36,7 @@ protected:
String getName() const override { return "local"; }
void printHelpMessage(const OptionsDescription & options_description) override;
void printHelpMessage(const OptionsDescription & options_description, bool verbose) override;
void addOptions(OptionsDescription & options_description) override;

View File

@ -231,7 +231,8 @@ std::unique_ptr<WriteBuffer> BackupWriterAzureBlobStorage::writeFile(const Strin
key,
DBMS_DEFAULT_BUFFER_SIZE,
write_settings,
settings);
settings,
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
}
void BackupWriterAzureBlobStorage::removeFile(const String & file_name)

View File

@ -109,7 +109,7 @@ RestorerFromBackup::~RestorerFromBackup()
if (getNumFutures() > 0)
{
LOG_INFO(log, "Waiting for {} tasks to finish", getNumFutures());
waitFutures();
waitFutures(/* throw_if_error= */ false);
}
}
@ -161,7 +161,7 @@ void RestorerFromBackup::run(Mode mode)
setStage(Stage::COMPLETED);
}
void RestorerFromBackup::waitFutures()
void RestorerFromBackup::waitFutures(bool throw_if_error)
{
std::exception_ptr error;
@ -176,11 +176,7 @@ void RestorerFromBackup::waitFutures()
if (futures_to_wait.empty())
break;
/// Wait for all tasks.
for (auto & future : futures_to_wait)
future.wait();
/// Check if there is an exception.
/// Wait for all tasks to finish.
for (auto & future : futures_to_wait)
{
try
@ -197,7 +193,12 @@ void RestorerFromBackup::waitFutures()
}
if (error)
std::rethrow_exception(error);
{
if (throw_if_error)
std::rethrow_exception(error);
else
tryLogException(error, log);
}
}
size_t RestorerFromBackup::getNumFutures() const

View File

@ -130,7 +130,7 @@ private:
/// Waits until all tasks are processed (including the tasks scheduled while we're waiting).
/// Throws an exception if any of the tasks throws an exception.
void waitFutures();
void waitFutures(bool throw_if_error = true);
/// Throws an exception if the RESTORE query was cancelled.
void checkIsQueryCancelled() const;

View File

@ -2955,7 +2955,8 @@ void ClientBase::init(int argc, char ** argv)
/// Common options for clickhouse-client and clickhouse-local.
options_description.main_description->add_options()
("help", "produce help message")
("help", "print usage summary, combine with --verbose to display all options")
("verbose", "print query and other debugging info")
("version,V", "print version information and exit")
("version-clean", "print version in machine-readable format and exit")
@ -2979,7 +2980,6 @@ void ClientBase::init(int argc, char ** argv)
("time,t", "print query execution time to stderr in non-interactive mode (for benchmarks)")
("echo", "in batch mode, print query before execution")
("verbose", "print query and other debugging info")
("log-level", po::value<std::string>(), "log level")
("server_logs_file", po::value<std::string>(), "put server logs into specified file")
@ -3008,6 +3008,8 @@ void ClientBase::init(int argc, char ** argv)
addOptions(options_description);
OptionsDescription options_description_non_verbose = options_description;
auto getter = [](const auto & op)
{
String op_long_name = op->long_name();
@ -3042,11 +3044,17 @@ void ClientBase::init(int argc, char ** argv)
exit(0); // NOLINT(concurrency-mt-unsafe)
}
if (options.count("verbose"))
config().setBool("verbose", true);
/// Output of help message.
if (options.count("help")
|| (options.count("host") && options["host"].as<std::string>() == "elp")) /// If user writes -help instead of --help.
{
printHelpMessage(options_description);
if (config().getBool("verbose", false))
printHelpMessage(options_description, true);
else
printHelpMessage(options_description_non_verbose, false);
exit(0); // NOLINT(concurrency-mt-unsafe)
}
@ -3113,8 +3121,6 @@ void ClientBase::init(int argc, char ** argv)
config().setBool("highlight", options["highlight"].as<bool>());
if (options.count("history_file"))
config().setString("history_file", options["history_file"].as<std::string>());
if (options.count("verbose"))
config().setBool("verbose", true);
if (options.count("interactive"))
config().setBool("interactive", true);
if (options.count("pager"))

View File

@ -121,7 +121,7 @@ protected:
};
virtual void updateLoggerLevel(const String &) {}
virtual void printHelpMessage(const OptionsDescription & options_description) = 0;
virtual void printHelpMessage(const OptionsDescription & options_description, bool verbose) = 0;
virtual void addOptions(OptionsDescription & options_description) = 0;
virtual void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,

View File

@ -24,6 +24,7 @@ public:
void updateHash(SipHash & hash) const override;
protected:
/// 1 byte (`gcd_bytes_size` value) + 1 byte (`bytes_to_skip` value) + `bytes_to_skip` bytes (trash) + `gcd_bytes_size` bytes (gcd value) + (`source_size` - `bytes_to_skip`) bytes (data)
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
@ -54,7 +55,7 @@ UInt32 CompressionCodecGCD::getMaxCompressedDataSize(UInt32 uncompressed_size) c
{
return uncompressed_size
+ gcd_bytes_size // To store gcd
+ 2; // Local header
+ 2; // Values of `gcd_bytes_size` and `bytes_to_skip`
}
uint8_t CompressionCodecGCD::getMethodByte() const
@ -147,7 +148,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest,
if (source_size - sizeof(T) != output_size)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress GCD-encoded data");
memcpy(dest, source, source_size);
memcpy(dest, source, source_size - sizeof(T));
return;
}
@ -160,6 +161,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest,
source += sizeof(T);
dest += sizeof(T);
}
chassert(source == source_end);
}
}
@ -209,6 +211,8 @@ void CompressionCodecGCD::doDecompressData(const char * source, UInt32 source_si
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress GCD-encoded data. File has wrong header");
UInt8 bytes_to_skip = uncompressed_size % bytes_size;
chassert(bytes_to_skip == static_cast<UInt8>(source[1]));
UInt32 output_size = uncompressed_size - bytes_to_skip;
if (static_cast<UInt32>(2 + bytes_to_skip) > source_size)

View File

@ -112,6 +112,7 @@ class IColumn;
M(Bool, azure_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in azure engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, azure_allow_parallel_part_upload, true, "Use multiple threads for azure multipart upload.", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(Bool, hdfs_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(Bool, azure_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
@ -712,7 +713,6 @@ class IColumn;
M(Bool, engine_file_skip_empty_files, false, "Allows to skip empty files in file table engine", 0) \
M(Bool, engine_url_skip_empty_files, false, "Allows to skip empty files in url table engine", 0) \
M(Bool, enable_url_encoding, true, " Allows to enable/disable decoding/encoding path in uri in URL table engine", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(Bool, database_replicated_enforce_synchronous_settings, false, "Enforces synchronous waiting for some queries (see also database_atomic_wait_for_drop_and_detach_synchronously, mutation_sync, alter_sync). Not recommended to enable these settings.", 0) \
M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \
@ -939,6 +939,7 @@ class IColumn;
MAKE_OBSOLETE(M, Bool, allow_experimental_query_cache, true) \
MAKE_OBSOLETE(M, Bool, allow_experimental_alter_materialized_view_structure, true) \
MAKE_OBSOLETE(M, Bool, allow_experimental_shared_merge_tree, true) \
MAKE_OBSOLETE(M, Bool, allow_experimental_database_replicated, true) \
\
MAKE_OBSOLETE(M, Milliseconds, async_insert_stale_timeout_ms, 0) \
MAKE_OBSOLETE(M, StreamingHandleErrorMode, handle_kafka_error_mode, StreamingHandleErrorMode::DEFAULT) \

View File

@ -92,12 +92,14 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"input_format_json_ignore_unnecessary_fields", false, true, "Ignore unnecessary fields and not parse them. Enabling this may not throw exceptions on json strings of invalid format or with duplicated fields"},
{"input_format_hive_text_allow_variable_number_of_columns", false, true, "Ignore extra columns in Hive Text input (if file has more columns than expected) and treat missing fields in Hive Text input as default values."},
{"first_day_of_week", "Monday", "Monday", "Added a setting for the first day of the week for date/time functions"},
{"allow_experimental_database_replicated", false, true, "Database engine Replicated is now in Beta stage"},
{"temporary_data_in_cache_reserve_space_wait_lock_timeout_milliseconds", (10 * 60 * 1000), (10 * 60 * 1000), "Wait time to lock cache for sapce reservation in temporary data in filesystem cache"},
{"hdfs_throw_on_zero_files_match", false, false, "Throw an error, when ListObjects request cannot match any files"},
{"azure_throw_on_zero_files_match", false, false, "Throw an error, when ListObjects request cannot match any files"},
{"s3_validate_request_settings", true, true, "Validate S3 request settings"},
{"azure_skip_empty_files", false, false, "Allow to skip empty files in azure table engine"},
}},
{"azure_allow_parallel_part_upload", "true", "true", "Use multiple threads for azure multipart upload."},
}},
{"24.3", {{"s3_connect_timeout_ms", 1000, 1000, "Introduce new dedicated setting for s3 connection timeout"},
{"allow_experimental_shared_merge_tree", false, true, "The setting is obsolete"},
{"use_page_cache_for_disks_without_file_cache", false, false, "Added userspace page cache"},

View File

@ -1139,8 +1139,10 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
}
LOG_INFO(log, "All tables are created successfully");
chassert(max_log_ptr_at_creation || our_log_ptr);
UInt32 first_entry_to_mark_finished = new_replica ? max_log_ptr_at_creation : our_log_ptr;
/// NOTE first_entry_to_mark_finished can be 0 if our replica has crashed just after creating its nodes in ZK,
/// so it's a new replica, but after restarting we don't know max_log_ptr_at_creation anymore...
/// It's a very rare case, and it's okay if some queries throw TIMEOUT_EXCEEDED when waiting for all replicas
if (first_entry_to_mark_finished)
{
/// If the replica is new and some of the queries applied during recovery

View File

@ -154,6 +154,7 @@ void IDisk::copyThroughBuffers(
/// Disable parallel write. We already copy in parallel.
/// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage
write_settings.s3_allow_parallel_part_upload = false;
write_settings.azure_allow_parallel_part_upload = false;
asyncCopy(*this, from_path, *to_disk, to_path, copying_thread_pool, results, copy_root_dir, read_settings, write_settings, cancellation_hook);

View File

@ -282,12 +282,17 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
LOG_TEST(log, "Writing file: {}", object.remote_path);
ThreadPoolCallbackRunnerUnsafe<void> scheduler;
if (write_settings.azure_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunnerUnsafe<void>(getThreadPoolWriter(), "VFSWrite");
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client.get(),
object.remote_path,
buf_size,
patchSettings(write_settings),
settings.get());
settings.get(),
std::move(scheduler));
}
void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const SharedAzureClientPtr & client_ptr, bool if_exists)

View File

@ -23,6 +23,7 @@ struct WriteSettings
size_t filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = 1000;
bool s3_allow_parallel_part_upload = true;
bool azure_allow_parallel_part_upload = true;
/// Monitoring
bool for_object_storage = false; // to choose which profile events should be incremented

View File

@ -245,11 +245,15 @@ void executeQuery(
const auto & shard_info = cluster->getShardsInfo()[i];
auto query_for_shard = query_info.query_tree->clone();
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
if (sharding_key_expr &&
query_info.optimized_cluster &&
settings.optimize_skip_unused_shards_rewrite_in &&
shards > 1 &&
/// TODO: support composite sharding key
sharding_key_expr->getRequiredColumns().size() == 1)
{
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
shard_info,
not_optimized_cluster->getSlotToShard(),
@ -282,11 +286,15 @@ void executeQuery(
const auto & shard_info = cluster->getShardsInfo()[i];
ASTPtr query_ast_for_shard = query_info.query->clone();
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
if (sharding_key_expr &&
query_info.optimized_cluster &&
settings.optimize_skip_unused_shards_rewrite_in &&
shards > 1 &&
/// TODO: support composite sharding key
sharding_key_expr->getRequiredColumns().size() == 1)
{
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
shard_info,
not_optimized_cluster->getSlotToShard(),

View File

@ -5256,6 +5256,7 @@ WriteSettings Context::getWriteSettings() const
res.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload;
res.azure_allow_parallel_part_upload = settings.azure_allow_parallel_part_upload;
res.remote_throttler = getRemoteWriteThrottler();
res.local_throttler = getLocalWriteThrottler();

View File

@ -258,15 +258,6 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
"Enable allow_experimental_database_materialized_mysql to use it");
}
if (create.storage->engine->name == "Replicated"
&& !getContext()->getSettingsRef().allow_experimental_database_replicated
&& !internal && !create.attach)
{
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE,
"Replicated is an experimental database engine. "
"Enable allow_experimental_database_replicated to use it");
}
if (create.storage->engine->name == "MaterializedPostgreSQL"
&& !getContext()->getSettingsRef().allow_experimental_database_materialized_postgresql
&& !internal && !create.attach)

View File

@ -38,25 +38,27 @@ Field executeFunctionOnField(
return (*ret.column)[0];
}
/// @param sharding_column_value - one of values from IN
/// @param column_value - one of values from IN
/// @param sharding_column_name - name of that column
/// @return true if shard may contain such value (or it is unknown), otherwise false.
bool shardContains(
Field sharding_column_value,
Field column_value,
const std::string & sharding_column_name,
const OptimizeShardingKeyRewriteInMatcher::Data & data)
{
/// Type of column in storage (used for implicit conversion from i.e. String to Int)
const DataTypePtr & column_type = data.sharding_key_expr->getSampleBlock().getByName(sharding_column_name).type;
/// Implicit conversion.
sharding_column_value = convertFieldToType(sharding_column_value, *data.sharding_key_type);
column_value = convertFieldToType(column_value, *column_type);
/// NULL is not allowed in sharding key,
/// so it should be safe to assume that shard cannot contain it.
if (sharding_column_value.isNull())
if (column_value.isNull())
return false;
Field sharding_value = executeFunctionOnField(
sharding_column_value, sharding_column_name,
data.sharding_key_expr, data.sharding_key_type,
column_value, sharding_column_name,
data.sharding_key_expr, column_type,
data.sharding_key_column_name);
/// The value from IN can be non-numeric,
/// but in this case it should be convertible to numeric type, let's try.

View File

@ -28,8 +28,6 @@ struct OptimizeShardingKeyRewriteInMatcher
{
/// Expression of sharding_key for the Distributed() table
const ExpressionActionsPtr & sharding_key_expr;
/// Type of sharding_key column.
const DataTypePtr & sharding_key_type;
/// Name of the column for sharding_expr
const std::string & sharding_key_column_name;
/// Info for the current shard (to compare shard_num with calculated)

View File

@ -357,6 +357,16 @@ bool ParserFilterClause::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;
}
if (function.name == "count")
{
/// Remove child from function.arguments if it's '*' because countIf(*) is not supported.
/// See https://github.com/ClickHouse/ClickHouse/issues/61004
std::erase_if(function.arguments->children, [] (const ASTPtr & child)
{
return typeid_cast<const ASTAsterisk *>(child.get()) || typeid_cast<const ASTQualifiedAsterisk *>(child.get());
});
}
function.name += "If";
function.arguments->children.push_back(condition->children[0]);
return true;

View File

@ -1,6 +1,7 @@
#include "GRPCServer.h"
#include <limits>
#include <memory>
#include <Poco/Net/SocketAddress.h>
#if USE_GRPC
#include <Columns/ColumnString.h>
@ -320,8 +321,27 @@ namespace
Poco::Net::SocketAddress getClientAddress() const
{
String peer = grpc_context.peer();
return Poco::Net::SocketAddress{peer.substr(peer.find(':') + 1)};
/// Returns a string like ipv4:127.0.0.1:55930 or ipv6:%5B::1%5D:55930
String uri_encoded_peer = grpc_context.peer();
constexpr const std::string_view ipv4_prefix = "ipv4:";
constexpr const std::string_view ipv6_prefix = "ipv6:";
bool ipv4 = uri_encoded_peer.starts_with(ipv4_prefix);
bool ipv6 = uri_encoded_peer.starts_with(ipv6_prefix);
if (!ipv4 && !ipv6)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected ipv4 or ipv6 protocol in peer address, got {}", uri_encoded_peer);
auto prefix = ipv4 ? ipv4_prefix : ipv6_prefix;
auto family = ipv4 ? Poco::Net::AddressFamily::Family::IPv4 : Poco::Net::AddressFamily::Family::IPv6;
uri_encoded_peer= uri_encoded_peer.substr(prefix.length());
String peer;
Poco::URI::decode(uri_encoded_peer, peer);
return Poco::Net::SocketAddress{family, peer};
}
std::optional<String> getClientHeader(const String & key) const

View File

@ -794,6 +794,8 @@ bool TCPHandler::readDataNext()
/// We accept and process data.
read_ok = receivePacket();
/// Reset the timeout on Ping packet (NOTE: there is no Ping for INSERT queries yet).
watch.restart();
break;
}

View File

@ -8371,4 +8371,29 @@ bool MergeTreeData::initializeDiskOnConfigChange(const std::set<String> & new_ad
}
return true;
}
bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic<ssize_t> & alter_conversions_mutations, bool remove)
{
for (const auto & command : commands)
{
if (AlterConversions::supportsMutationCommandType(command.type))
{
if (remove)
{
--alter_conversions_mutations;
if (alter_conversions_mutations < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly mutations counter is negative ({})", alter_conversions_mutations);
}
else
{
if (alter_conversions_mutations < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly mutations counter is negative ({})", alter_conversions_mutations);
++alter_conversions_mutations;
}
return true;
}
}
return false;
}
}

View File

@ -1708,4 +1708,8 @@ struct CurrentlySubmergingEmergingTagger
|| (settings.min_compressed_bytes_to_fsync_after_merge && input_bytes >= settings.min_compressed_bytes_to_fsync_after_merge));
}
/// Look at MutationCommands if it contains mutations for AlterConversions, update the counter.
/// Return true if the counter had been updated
bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic<ssize_t> & alter_conversions_mutations, bool remove);
}

View File

@ -6,8 +6,11 @@
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/noexcept_scope.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/CurrentMetrics.h>
#include <Storages/MutationCommands.h>
#include <base/defines.h>
#include <Parsers/formatAST.h>
#include <base/sort.h>
#include <ranges>
@ -942,7 +945,14 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
mutations_by_partition.erase(partition_and_block_num.first);
}
it = mutations_by_znode.erase(it);
if (!it->second.is_done)
{
const auto commands = entry.commands;
it = mutations_by_znode.erase(it);
updateAlterConversionsMutations(commands, alter_conversions_mutations, /* remove= */ true);
}
else
it = mutations_by_znode.erase(it);
}
else
++it;
@ -991,12 +1001,15 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version))
.first->second;
for (const auto & pair : entry->block_numbers)
{
const String & partition_id = pair.first;
Int64 block_num = pair.second;
mutations_by_partition[partition_id].emplace(block_num, &mutation);
}
updateAlterConversionsMutations(entry->commands, alter_conversions_mutations, /* remove= */ false);
NOEXCEPT_SCOPE({
for (const auto & pair : entry->block_numbers)
{
const String & partition_id = pair.first;
Int64 block_num = pair.second;
mutations_by_partition[partition_id].emplace(block_num, &mutation);
}
});
LOG_TRACE(log, "Adding mutation {} for {} partitions (data versions: {})",
entry->znode_name, entry->block_numbers.size(), entry->getBlockNumbersForLogs());
@ -1062,6 +1075,8 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
}
mutations_by_znode.erase(it);
/// updateAlterConversionsMutations() will be called in updateMutations()
LOG_DEBUG(log, "Removed mutation {} from local state.", entry->znode_name);
}
@ -1887,6 +1902,15 @@ ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zk
MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
{
int32_t part_metadata_version = part->getMetadataVersion();
int32_t metadata_version = storage.getInMemoryMetadataPtr()->getMetadataVersion();
chassert(alter_conversions_mutations >= 0);
/// NOTE: that just checking part_metadata_version is not enough, since we
/// need to check for non-metadata mutations as well.
if (alter_conversions_mutations == 0 && metadata_version == part_metadata_version)
return {};
std::unique_lock lock(state_mutex);
auto in_partition = mutations_by_partition.find(part->info.partition_id);
@ -1894,13 +1918,18 @@ MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const
return {};
Int64 part_data_version = part->info.getDataVersion();
Int64 part_metadata_version = part->getMetadataVersion();
MutationCommands result;
bool seen_all_data_mutations = false;
bool seen_all_metadata_mutations = false;
auto add_to_result = [&](const ReplicatedMergeTreeMutationEntryPtr & entry)
{
for (const auto & command : entry->commands | std::views::reverse)
if (AlterConversions::supportsMutationCommandType(command.type))
result.emplace_back(command);
};
/// Here we return mutation commands for part which has bigger alter version than part metadata version.
/// Please note, we don't use getDataVersion(). It's because these alter commands are used for in-fly conversions
/// of part's metadata.
@ -1911,28 +1940,22 @@ MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const
auto & entry = mutation_status->entry;
auto add_to_result = [&] {
for (const auto & command : entry->commands | std::views::reverse)
if (AlterConversions::supportsMutationCommandType(command.type))
result.emplace_back(command);
};
auto alter_version = entry->alter_version;
if (alter_version != -1)
{
if (alter_version > storage.getInMemoryMetadataPtr()->getMetadataVersion())
if (alter_version > metadata_version)
continue;
/// We take commands with bigger metadata version
if (alter_version > part_metadata_version)
add_to_result();
add_to_result(entry);
else
seen_all_metadata_mutations = true;
}
else
{
if (mutation_version > part_data_version)
add_to_result();
add_to_result(entry);
else
seen_all_data_mutations = true;
}
@ -2019,6 +2042,8 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
"were executed concurrently on different replicas.", znode);
mutation.parts_to_do.clear();
}
updateAlterConversionsMutations(mutation.entry->commands, alter_conversions_mutations, /* remove= */ true);
}
else if (mutation.parts_to_do.size() == 0)
{
@ -2075,6 +2100,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
LOG_TRACE(log, "Finishing data alter with version {} for entry {}", entry->alter_version, entry->znode_name);
alter_sequence.finishDataAlter(entry->alter_version, lock);
}
updateAlterConversionsMutations(entry->commands, alter_conversions_mutations, /* remove= */ true);
}
}
}

View File

@ -151,6 +151,8 @@ private:
/// Mapping from znode path to Mutations Status
std::map<String, MutationStatus> mutations_by_znode;
/// Unfinished mutations that is required AlterConversions (see getAlterMutationCommandsForPart())
std::atomic<ssize_t> alter_conversions_mutations = 0;
/// Partition -> (block_number -> MutationStatus)
std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition;
/// Znode ID of the latest mutation that is done.

View File

@ -521,9 +521,18 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, Context
String mutation_id = entry.file_name;
if (txn)
txn->addMutation(shared_from_this(), mutation_id);
bool alter_conversions_mutations_updated = updateAlterConversionsMutations(entry.commands, alter_conversions_mutations, /* remove= */ false);
bool inserted = current_mutations_by_version.try_emplace(version, std::move(entry)).second;
if (!inserted)
{
if (alter_conversions_mutations_updated)
{
--alter_conversions_mutations;
chassert(alter_conversions_mutations >= 0);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", version);
}
LOG_INFO(log, "Added mutation: {}{}", mutation_id, additional_info);
}
@ -559,6 +568,8 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_fail_reason.clear();
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
mutation_backoff_policy.removePartFromFailed(failed_part->name);
updateAlterConversionsMutations(it->second.commands, alter_conversions_mutations, /* remove= */ true);
}
}
else
@ -837,8 +848,20 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
auto it = current_mutations_by_version.find(mutation_version);
if (it != current_mutations_by_version.end())
{
bool mutation_finished = true;
if (std::optional<Int64> min_version = getMinPartDataVersion())
mutation_finished = *min_version > static_cast<Int64>(mutation_version);
to_kill.emplace(std::move(it->second));
current_mutations_by_version.erase(it);
if (!mutation_finished)
{
const auto commands = it->second.commands;
current_mutations_by_version.erase(it);
updateAlterConversionsMutations(commands, alter_conversions_mutations, /* remove= */ true);
}
else
current_mutations_by_version.erase(it);
}
}
@ -916,6 +939,7 @@ void StorageMergeTree::loadMutations()
auto inserted = current_mutations_by_version.try_emplace(block_number, std::move(entry)).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number);
updateAlterConversionsMutations(entry.commands, alter_conversions_mutations, /* remove= */ false);
}
else if (startsWith(it->name(), "tmp_mutation_"))
{
@ -2159,10 +2183,6 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr local_context)
{
auto lock1 = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto merges_blocker = stopMergesAndWait();
auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
@ -2176,6 +2196,13 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
this->getStoragePolicy()->getName(), dest_table_storage->getStorageID().getNameForLogs(),
dest_table_storage->getStoragePolicy()->getName());
// Use the same back-pressure (delay/throw) logic as for INSERTs to be consistent and avoid possibility of exceeding part limits using MOVE PARTITION queries
dest_table_storage->delayInsertOrThrowIfNeeded(nullptr, local_context, true);
auto lock1 = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto merges_blocker = stopMergesAndWait();
auto dest_metadata_snapshot = dest_table->getInMemoryMetadataPtr();
auto metadata_snapshot = getInMemoryMetadataPtr();
Stopwatch watch;
@ -2409,6 +2436,13 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
MutationCommands StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
/// NOTE: there is no need to check part metadata_version, since
/// ALTER_METADATA cannot be done asynchronously, like in
/// ReplicatedMergeTree.
chassert(alter_conversions_mutations >= 0);
if (alter_conversions_mutations == 0)
return {};
std::lock_guard lock(currently_processing_in_background_mutex);
UInt64 part_data_version = part->info.getDataVersion();

View File

@ -147,6 +147,8 @@ private:
DataParts currently_merging_mutating_parts;
std::map<UInt64, MergeTreeMutationEntry> current_mutations_by_version;
/// Unfinished mutations that is required AlterConversions (see getAlterMutationCommandsForPart())
std::atomic<ssize_t> alter_conversions_mutations = 0;
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};

View File

@ -8244,10 +8244,6 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context)
{
auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto storage_settings_ptr = getSettings();
auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table);
if (!dest_table_storage)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
@ -8261,6 +8257,13 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
this->getStoragePolicy()->getName(), getStorageID().getNameForLogs(),
dest_table_storage->getStoragePolicy()->getName());
// Use the same back-pressure (delay/throw) logic as for INSERTs to be consistent and avoid possibility of exceeding part limits using MOVE PARTITION queries
dest_table_storage->delayInsertOrThrowIfNeeded(nullptr, query_context, true);
auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto storage_settings_ptr = getSettings();
auto dest_metadata_snapshot = dest_table->getInMemoryMetadataPtr();
auto metadata_snapshot = getInMemoryMetadataPtr();

View File

@ -49,7 +49,7 @@ class CILabels(metaclass=WithIter):
CI_SET_FAST = "ci_set_fast"
CI_SET_ARM = "ci_set_arm"
CI_SET_INTEGRATION = "ci_set_integration"
CI_SET_ANALYZER = "ci_set_analyzer"
CI_SET_OLD_ANALYZER = "ci_set_old_analyzer"
CI_SET_STATLESS = "ci_set_stateless"
CI_SET_STATEFUL = "ci_set_stateful"
CI_SET_STATLESS_ASAN = "ci_set_stateless_asan"
@ -98,15 +98,16 @@ class JobNames(metaclass=WithIter):
STATELESS_TEST_TSAN = "Stateless tests (tsan)"
STATELESS_TEST_MSAN = "Stateless tests (msan)"
STATELESS_TEST_UBSAN = "Stateless tests (ubsan)"
STATELESS_TEST_ANALYZER_S3_REPLICATED_RELEASE = (
STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE = (
"Stateless tests (release, old analyzer, s3, DatabaseReplicated)"
)
# merged into STATELESS_TEST_ANALYZER_S3_REPLICATED_RELEASE:
# STATELESS_TEST_ANALYZER_RELEASE = "Stateless tests (release, analyzer)"
# merged into STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE:
# STATELESS_TEST_OLD_ANALYZER_RELEASE = "Stateless tests (release, analyzer)"
# STATELESS_TEST_DB_REPL_RELEASE = "Stateless tests (release, DatabaseReplicated)"
# STATELESS_TEST_S3_RELEASE = "Stateless tests (release, s3 storage)"
STATELESS_TEST_S3_DEBUG = "Stateless tests (debug, s3 storage)"
STATELESS_TEST_S3_TSAN = "Stateless tests (tsan, s3 storage)"
STATELESS_TEST_AZURE_ASAN = "Stateless tests (azure, asan)"
STATELESS_TEST_FLAKY_ASAN = "Stateless tests flaky check (asan)"
STATEFUL_TEST_DEBUG = "Stateful tests (debug)"
@ -129,10 +130,11 @@ class JobNames(metaclass=WithIter):
STRESS_TEST_UBSAN = "Stress test (ubsan)"
STRESS_TEST_MSAN = "Stress test (msan)"
STRESS_TEST_DEBUG = "Stress test (debug)"
STRESS_TEST_AZURE_TSAN = "Stress test (azure, tsan)"
INTEGRATION_TEST = "Integration tests (release)"
INTEGRATION_TEST_ASAN = "Integration tests (asan)"
INTEGRATION_TEST_ASAN_ANALYZER = "Integration tests (asan, old analyzer)"
INTEGRATION_TEST_ASAN_OLD_ANALYZER = "Integration tests (asan, old analyzer)"
INTEGRATION_TEST_TSAN = "Integration tests (tsan)"
INTEGRATION_TEST_ARM = "Integration tests (aarch64)"
INTEGRATION_TEST_FLAKY = "Integration tests flaky check (asan)"
@ -846,14 +848,14 @@ CI_CONFIG = CIConfig(
JobNames.INTEGRATION_TEST,
]
),
CILabels.CI_SET_ANALYZER: LabelConfig(
CILabels.CI_SET_OLD_ANALYZER: LabelConfig(
run_jobs=[
JobNames.STYLE_CHECK,
JobNames.FAST_TEST,
Build.PACKAGE_RELEASE,
Build.PACKAGE_ASAN,
JobNames.STATELESS_TEST_ANALYZER_S3_REPLICATED_RELEASE,
JobNames.INTEGRATION_TEST_ASAN_ANALYZER,
JobNames.STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE,
JobNames.INTEGRATION_TEST_ASAN_OLD_ANALYZER,
]
),
CILabels.CI_SET_STATLESS: LabelConfig(
@ -1193,7 +1195,7 @@ CI_CONFIG = CIConfig(
JobNames.STATELESS_TEST_AARCH64: TestConfig(
Build.PACKAGE_AARCH64, job_config=JobConfig(**statless_test_common_params) # type: ignore
),
JobNames.STATELESS_TEST_ANALYZER_S3_REPLICATED_RELEASE: TestConfig(
JobNames.STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE: TestConfig(
Build.PACKAGE_RELEASE,
job_config=JobConfig(num_batches=4, **statless_test_common_params), # type: ignore
),
@ -1201,6 +1203,10 @@ CI_CONFIG = CIConfig(
Build.PACKAGE_DEBUG,
job_config=JobConfig(num_batches=6, **statless_test_common_params), # type: ignore
),
JobNames.STATELESS_TEST_AZURE_ASAN: TestConfig(
Build.PACKAGE_ASAN,
job_config=JobConfig(num_batches=4, **statless_test_common_params, release_only=True), # type: ignore
),
JobNames.STATELESS_TEST_S3_TSAN: TestConfig(
Build.PACKAGE_TSAN,
job_config=JobConfig(num_batches=5, **statless_test_common_params), # type: ignore
@ -1223,6 +1229,9 @@ CI_CONFIG = CIConfig(
JobNames.UPGRADE_TEST_ASAN: TestConfig(
Build.PACKAGE_ASAN, job_config=JobConfig(pr_only=True, random_bucket="upgrade_with_sanitizer", **upgrade_test_common_params) # type: ignore
),
JobNames.STRESS_TEST_AZURE_TSAN: TestConfig(
Build.PACKAGE_TSAN, job_config=JobConfig(**stress_test_common_params, release_only=True) # type: ignore
),
JobNames.UPGRADE_TEST_TSAN: TestConfig(
Build.PACKAGE_TSAN, job_config=JobConfig(pr_only=True, random_bucket="upgrade_with_sanitizer", **upgrade_test_common_params) # type: ignore
),
@ -1236,7 +1245,7 @@ CI_CONFIG = CIConfig(
Build.PACKAGE_ASAN,
job_config=JobConfig(num_batches=4, **integration_test_common_params, release_only=True), # type: ignore
),
JobNames.INTEGRATION_TEST_ASAN_ANALYZER: TestConfig(
JobNames.INTEGRATION_TEST_ASAN_OLD_ANALYZER: TestConfig(
Build.PACKAGE_ASAN,
job_config=JobConfig(num_batches=6, **integration_test_common_params), # type: ignore
),
@ -1368,8 +1377,8 @@ REQUIRED_CHECKS = [
JobNames.UNIT_TEST,
JobNames.UNIT_TEST_TSAN,
JobNames.UNIT_TEST_UBSAN,
JobNames.INTEGRATION_TEST_ASAN_ANALYZER,
JobNames.STATELESS_TEST_ANALYZER_S3_REPLICATED_RELEASE,
JobNames.INTEGRATION_TEST_ASAN_OLD_ANALYZER,
JobNames.STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE,
]

View File

@ -15,6 +15,7 @@ from clickhouse_helper import CiLogsCredentials
from docker_images_helper import DockerImage, get_docker_image, pull_image
from download_release_packages import download_last_release
from env_helper import REPO_COPY, REPORT_PATH, TEMP_PATH
from get_robot_token import get_parameter_from_ssm
from pr_info import PRInfo
from report import ERROR, SUCCESS, JobReport, StatusType, TestResults, read_test_results
from stopwatch import Stopwatch
@ -27,6 +28,8 @@ def get_additional_envs(
check_name: str, run_by_hash_num: int, run_by_hash_total: int
) -> List[str]:
result = []
azure_connection_string = get_parameter_from_ssm("azure_connection_string")
result.append(f"AZURE_CONNECTION_STRING='{azure_connection_string}'")
if "DatabaseReplicated" in check_name:
result.append("USE_DATABASE_REPLICATED=1")
if "DatabaseOrdinary" in check_name:
@ -40,6 +43,9 @@ def get_additional_envs(
result.append("RANDOMIZE_OBJECT_KEY_TYPE=1")
if "analyzer" in check_name:
result.append("USE_OLD_ANALYZER=1")
if "azure" in check_name:
assert "USE_S3_STORAGE_FOR_MERGE_TREE=1" not in result
result.append("USE_AZURE_STORAGE_FOR_MERGE_TREE=1")
if run_by_hash_total != 0:
result.append(f"RUN_BY_HASH_NUM={run_by_hash_num}")

View File

@ -265,7 +265,9 @@ class ClickhouseIntegrationTestsRunner:
self.start_time = time.time()
self.soft_deadline_time = self.start_time + (TASK_TIMEOUT - MAX_TIME_IN_SANDBOX)
self.use_analyzer = os.environ.get("CLICKHOUSE_USE_OLD_ANALYZER") is not None
self.use_old_analyzer = (
os.environ.get("CLICKHOUSE_USE_OLD_ANALYZER") is not None
)
if "run_by_hash_total" in self.params:
self.run_by_hash_total = self.params["run_by_hash_total"]
@ -414,8 +416,8 @@ class ClickhouseIntegrationTestsRunner:
result.append("--tmpfs")
if self.disable_net_host:
result.append("--disable-net-host")
if self.use_analyzer:
result.append("--analyzer")
if self.use_old_analyzer:
result.append("--old-analyzer")
return " ".join(result)

View File

@ -19,7 +19,6 @@ def get_options(i: int, upgrade_check: bool) -> str:
if i % 3 == 2 and not upgrade_check:
options.append(f'''--db-engine="Replicated('/test/db/test_{i}', 's1', 'r1')"''')
client_options.append("allow_experimental_database_replicated=1")
client_options.append("enable_deflate_qpl_codec=1")
client_options.append("enable_zstd_qat_codec=1")

View File

@ -12,18 +12,23 @@ from build_download_helper import download_all_deb_packages
from clickhouse_helper import CiLogsCredentials
from docker_images_helper import DockerImage, get_docker_image, pull_image
from env_helper import REPO_COPY, REPORT_PATH, TEMP_PATH
from get_robot_token import get_parameter_from_ssm
from pr_info import PRInfo
from report import ERROR, JobReport, TestResult, TestResults, read_test_results
from stopwatch import Stopwatch
from tee_popen import TeePopen
def get_additional_envs() -> List[str]:
def get_additional_envs(check_name: str) -> List[str]:
result = []
azure_connection_string = get_parameter_from_ssm("azure_connection_string")
result.append(f"AZURE_CONNECTION_STRING='{azure_connection_string}'")
# some cloud-specific features require feature flags enabled
# so we need this ENV to be able to disable the randomization
# of feature flags
result.append("RANDOMIZE_KEEPER_FEATURE_FLAGS=1")
if "azure" in check_name:
result.append("USE_AZURE_STORAGE_FOR_MERGE_TREE=1")
return result
@ -143,7 +148,7 @@ def run_stress_test(docker_image_name: str) -> None:
pr_info, stopwatch.start_time_str, check_name
)
additional_envs = get_additional_envs()
additional_envs = get_additional_envs(check_name)
run_command = get_run_command(
packages_path,

View File

@ -4,7 +4,6 @@
import unittest
from ci import CiOptions
from ci_config import JobNames
_TEST_BODY_1 = """
#### Run only:
@ -44,6 +43,85 @@ _TEST_BODY_3 = """
- [x] <!---ci_include_analyzer--> Must include all tests for analyzer
"""
_TEST_JOB_LIST = [
"Style check",
"Fast test",
"package_release",
"package_asan",
"Docker server image",
"Docker keeper image",
"Install packages (amd64)",
"Install packages (arm64)",
"Stateless tests (debug)",
"Stateless tests (release)",
"Stateless tests (coverage)",
"Stateless tests (aarch64)",
"Stateless tests (asan)",
"Stateless tests (tsan)",
"Stateless tests (msan)",
"Stateless tests (ubsan)",
"Stateless tests (release, old analyzer, s3, DatabaseReplicated)",
"Stateless tests (debug, s3 storage)",
"Stateless tests (tsan, s3 storage)",
"Stateless tests flaky check (asan)",
"Stateful tests (debug)",
"Stateful tests (release)",
"Stateful tests (coverage)",
"Stateful tests (aarch64)",
"Stateful tests (asan)",
"Stateful tests (tsan)",
"Stateful tests (msan)",
"Stateful tests (ubsan)",
"Stateful tests (release, ParallelReplicas)",
"Stateful tests (debug, ParallelReplicas)",
"Stateful tests (asan, ParallelReplicas)",
"Stateful tests (msan, ParallelReplicas)",
"Stateful tests (ubsan, ParallelReplicas)",
"Stateful tests (tsan, ParallelReplicas)",
"Stress test (asan)",
"Stress test (tsan)",
"Stress test (ubsan)",
"Stress test (msan)",
"Stress test (debug)",
"Integration tests (release)",
"Integration tests (asan)",
"Integration tests (asan, old analyzer)",
"Integration tests (tsan)",
"Integration tests (aarch64)",
"Integration tests flaky check (asan)",
"Upgrade check (debug)",
"Upgrade check (asan)",
"Upgrade check (tsan)",
"Upgrade check (msan)",
"Unit tests (release)",
"Unit tests (asan)",
"Unit tests (msan)",
"Unit tests (tsan)",
"Unit tests (ubsan)",
"AST fuzzer (debug)",
"AST fuzzer (asan)",
"AST fuzzer (msan)",
"AST fuzzer (tsan)",
"AST fuzzer (ubsan)",
"ClickHouse Keeper Jepsen",
"ClickHouse Server Jepsen",
"Performance Comparison",
"Performance Comparison Aarch64",
"Sqllogic test (release)",
"SQLancer (release)",
"SQLancer (debug)",
"SQLTest",
"Compatibility check (amd64)",
"Compatibility check (aarch64)",
"ClickBench (amd64)",
"ClickBench (aarch64)",
"libFuzzer tests",
"ClickHouse build check",
"ClickHouse special build check",
"Docs check",
"Bugfix validation",
]
class TestCIOptions(unittest.TestCase):
def test_pr_body_parsing(self):
@ -69,7 +147,7 @@ class TestCIOptions(unittest.TestCase):
ci_options.exclude_keywords,
["tsan", "aarch64", "analyzer", "s3_storage", "coverage"],
)
jobs_to_do = list(JobNames)
jobs_to_do = list(_TEST_JOB_LIST)
jobs_to_skip = []
job_params = {}
jobs_to_do, jobs_to_skip, job_params = ci_options.apply(
@ -81,9 +159,6 @@ class TestCIOptions(unittest.TestCase):
"Style check",
"package_release",
"package_asan",
"package_ubsan",
"package_debug",
"package_msan",
"Stateless tests (asan)",
"Stateless tests flaky check (asan)",
"Stateless tests (msan)",
@ -103,7 +178,7 @@ class TestCIOptions(unittest.TestCase):
)
self.assertCountEqual(ci_options.include_keywords, ["analyzer"])
self.assertIsNone(ci_options.exclude_keywords)
jobs_to_do = list(JobNames)
jobs_to_do = list(_TEST_JOB_LIST)
jobs_to_skip = []
job_params = {}
jobs_to_do, jobs_to_skip, job_params = ci_options.apply(

View File

@ -1646,8 +1646,8 @@ class TestCase:
client_options = self.add_random_settings(client_options)
if not is_valid_utf_8(self.case_file) or not is_valid_utf_8(
self.reference_file
if not is_valid_utf_8(self.case_file) or (
self.reference_file and not is_valid_utf_8(self.reference_file)
):
proc, stdout, stderr, debug_log, total_time = self.run_single_test(
server_logs_level, client_options

View File

@ -4,13 +4,10 @@
<azure>
<type>object_storage</type>
<object_storage_type>azure</object_storage_type>
<storage_account_url>http://localhost:10000/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<max_single_part_upload_size>33554432</max_single_part_upload_size>
<container_name>openbucketforpublicci</container_name>
<connection_string from_env="AZURE_CONNECTION_STRING"/>
</azure>
<cached_azure>
<type>cache</type>

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<storage_policy>azure_cache</storage_policy>
</merge_tree>
</clickhouse>

View File

@ -156,7 +156,7 @@ if [[ -n "$USE_DATABASE_ORDINARY" ]] && [[ "$USE_DATABASE_ORDINARY" -eq 1 ]]; th
ln -sf $SRC_PATH/users.d/database_ordinary.xml $DEST_SERVER_PATH/users.d/
fi
if [[ -n "$USE_S3_STORAGE_FOR_MERGE_TREE" ]] && [[ "$USE_S3_STORAGE_FOR_MERGE_TREE" -eq 1 ]]; then
if [[ "$USE_S3_STORAGE_FOR_MERGE_TREE" == "1" ]]; then
object_key_types_options=("generate-suffix" "generate-full-key" "generate-template-key")
object_key_type="${object_key_types_options[0]}"
@ -177,6 +177,8 @@ if [[ -n "$USE_S3_STORAGE_FOR_MERGE_TREE" ]] && [[ "$USE_S3_STORAGE_FOR_MERGE_TR
ln -sf $SRC_PATH/config.d/s3_storage_policy_by_default.xml $DEST_SERVER_PATH/config.d/
;;
esac
elif [[ "$USE_AZURE_STORAGE_FOR_MERGE_TREE" == "1" ]]; then
ln -sf $SRC_PATH/config.d/azure_storage_policy_by_default.xml $DEST_SERVER_PATH/config.d/
fi
ARM="aarch64"

View File

@ -1,7 +1,6 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<distributed_ddl_output_mode>none</distributed_ddl_output_mode>
<database_replicated_initial_query_timeout_sec>120</database_replicated_initial_query_timeout_sec>
<distributed_ddl_task_timeout>120</distributed_ddl_task_timeout>

View File

@ -1601,7 +1601,7 @@ class ClickHouseCluster:
with_jdbc_bridge=False,
with_hive=False,
with_coredns=False,
use_old_analyzer=False,
use_old_analyzer=None,
hostname=None,
env_variables=None,
instance_env_variables=False,
@ -4405,8 +4405,18 @@ class ClickHouseInstance:
)
write_embedded_config("0_common_instance_users.xml", users_d_dir)
if self.use_old_analyzer:
write_embedded_config("0_common_enable_analyzer.xml", users_d_dir)
use_old_analyzer = os.environ.get("CLICKHOUSE_USE_OLD_ANALYZER") is not None
# If specific version was used there can be no
# allow_experimental_analyzer setting, so do this only if it was
# explicitly requested.
if self.tag:
use_old_analyzer = False
# Prefer specified in the test option:
if self.use_old_analyzer is not None:
use_old_analyzer = self.use_old_analyzer
if use_old_analyzer:
write_embedded_config("0_common_enable_old_analyzer.xml", users_d_dir)
if len(self.custom_dictionaries_paths):
write_embedded_config("0_common_enable_dictionaries.xml", self.config_d_dir)

View File

@ -285,11 +285,11 @@ if __name__ == "__main__":
)
parser.add_argument(
"--analyzer",
"--old-analyzer",
action="store_true",
default=False,
dest="analyzer",
help="Use new analyzer infrastructure",
dest="old_analyzer",
help="Use old analyzer infrastructure",
)
parser.add_argument(
@ -385,9 +385,9 @@ if __name__ == "__main__":
if args.keyword_expression:
args.pytest_args += ["-k", args.keyword_expression]
use_analyzer = ""
if args.analyzer:
use_analyzer = "-e CLICKHOUSE_USE_OLD_ANALYZER=1"
use_old_analyzer = ""
if args.old_analyzer:
use_old_analyzer = "-e CLICKHOUSE_USE_OLD_ANALYZER=1"
# NOTE: since pytest options is in the argument value already we need to additionally escape '"'
pytest_opts = " ".join(
@ -408,7 +408,7 @@ if __name__ == "__main__":
f"--volume={args.utils_dir}/backupview:/ClickHouse/utils/backupview "
f"--volume={args.utils_dir}/grpc-client/pb2:/ClickHouse/utils/grpc-client/pb2 "
f"--volume=/run:/run/host:ro {dockerd_internal_volume} {env_tags} {env_cleanup} "
f"-e DOCKER_CLIENT_TIMEOUT=300 -e COMPOSE_HTTP_TIMEOUT=600 {use_analyzer} -e PYTHONUNBUFFERED=1 "
f"-e DOCKER_CLIENT_TIMEOUT=300 -e COMPOSE_HTTP_TIMEOUT=600 {use_old_analyzer} -e PYTHONUNBUFFERED=1 "
f'-e PYTEST_ADDOPTS="{parallel_args} {pytest_opts} {tests_list} {rand_args} -vvv"'
f" {DIND_INTEGRATION_TESTS_IMAGE_NAME}:{args.docker_image_version}"
)

View File

@ -1,7 +1,6 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
</default>
</profiles>
<users>

View File

@ -1,7 +1,6 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<allow_deprecated_database_ordinary>1</allow_deprecated_database_ordinary>
</default>
</profiles>

View File

@ -1,7 +1,6 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
</default>
</profiles>
</clickhouse>

View File

@ -295,7 +295,6 @@ def test_replicated_database(cluster):
node1 = cluster.instances["node3"]
node1.query(
"CREATE DATABASE rdb ENGINE=Replicated('/test/rdb', 's1', 'r1')",
settings={"allow_experimental_database_replicated": 1},
)
global uuids
@ -312,7 +311,6 @@ def test_replicated_database(cluster):
node2 = cluster.instances["node2"]
node2.query(
"CREATE DATABASE rdb ENGINE=Replicated('/test/rdb', 's1', 'r2')",
settings={"allow_experimental_database_replicated": 1},
)
node2.query("SYSTEM SYNC DATABASE REPLICA rdb")

View File

@ -7,7 +7,7 @@ import uuid
import time
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
cluster = ClickHouseCluster(__file__)
@ -23,9 +23,20 @@ def make_instance(name, cfg, *args, **kwargs):
)
# DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 added in 23.3, ensure that CLICKHOUSE_CI_MIN_TESTED_VERSION fits
assert CLICKHOUSE_CI_MIN_TESTED_VERSION < "23.3"
# _n1/_n2 contains cluster with different <secret> -- should fail
n1 = make_instance("n1", "configs/remote_servers_n1.xml")
n2 = make_instance("n2", "configs/remote_servers_n2.xml")
backward = make_instance(
"backward",
"configs/remote_servers_backward.xml",
image="clickhouse/clickhouse-server",
# version without DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
with_installed_binary=True,
)
users = pytest.mark.parametrize(
"user,password",
@ -399,3 +410,25 @@ def test_per_user_protocol_settings_secure_cluster(user, password):
assert int(get_query_setting_on_shard(n1, id_, "max_memory_usage_for_user")) == int(
1e9
)
@users
def test_user_secure_cluster_with_backward(user, password):
id_ = "with-backward-query-dist_secure-" + user
n1.query(
f"SELECT *, '{id_}' FROM dist_secure_backward", user=user, password=password
)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]
@users
def test_user_secure_cluster_from_backward(user, password):
id_ = "from-backward-query-dist_secure-" + user
backward.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]
assert n1.contains_in_log(
"Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster."
)

View File

@ -3,7 +3,6 @@
<tcp_port>9000</tcp_port>
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
</default>
</profiles>
<users>

View File

@ -37,18 +37,13 @@ def start_cluster():
def test_drop_if_empty(start_cluster):
settings = {
"allow_experimental_database_replicated": 1,
}
node1.query(
"CREATE DATABASE replicateddb "
"ENGINE = Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node1')",
settings=settings,
)
node2.query(
"CREATE DATABASE replicateddb "
"ENGINE = Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node2')",
settings=settings,
)
node1.query(
"CREATE TABLE default.tbl ON CLUSTER 'cluster' ("

View File

@ -66,7 +66,6 @@ def test_ddl(started_cluster):
def test_ddl_replicated(started_cluster):
control_node.query(
"CREATE DATABASE test_db ON CLUSTER 'external' ENGINE=Replicated('/replicated')",
settings={"allow_experimental_database_replicated": 1},
)
# Exception is expected
assert "It's not initial query" in control_node.query_and_get_error(

View File

@ -27,6 +27,8 @@ if is_arm():
# Utilities
IPV6_ADDRESS = "2001:3984:3989::1:1111"
config_dir = os.path.join(script_dir, "./configs")
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
@ -36,12 +38,15 @@ node = cluster.add_instance(
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="")
},
ipv6_address=IPV6_ADDRESS,
)
main_channel = None
def create_channel():
node_ip_with_grpc_port = cluster.get_instance_ip("node") + ":" + str(GRPC_PORT)
def create_channel(hostname=None):
if not hostname:
hostname = cluster.get_instance_ip("node")
node_ip_with_grpc_port = hostname + ":" + str(GRPC_PORT)
channel = grpc.insecure_channel(node_ip_with_grpc_port)
grpc.channel_ready_future(channel).result(timeout=10)
global main_channel
@ -204,6 +209,11 @@ def test_select_one():
assert query("SELECT 1") == "1\n"
def test_ipv6_select_one():
with create_channel(f"[{IPV6_ADDRESS}]") as channel:
assert query("SELECT 1", channel=channel) == "1\n"
def test_ordinary_query():
assert query("SELECT count() FROM numbers(100)") == "100\n"

View File

@ -2,7 +2,6 @@
<profiles>
<default>
<allow_drop_detached>1</allow_drop_detached>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<allow_experimental_alter_materialized_view_structure>1</allow_experimental_alter_materialized_view_structure>
<allow_experimental_object_type>0</allow_experimental_object_type>
<allow_suspicious_codecs>0</allow_suspicious_codecs>

View File

@ -2,7 +2,6 @@
<profiles>
<default>
<allow_drop_detached>1</allow_drop_detached>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<allow_experimental_alter_materialized_view_structure>1</allow_experimental_alter_materialized_view_structure>
</default>
</profiles>

View File

@ -32,4 +32,3 @@
<drop_query>drop table alter_select_{engine}</drop_query>
</test>

View File

@ -15,7 +15,7 @@ function create_db()
SUFFIX=$(($RANDOM % 16))
# Multiple database replicas on one server are actually not supported (until we have namespaces).
# So CREATE TABLE queries will fail on all replicas except one. But it's still makes sense for a stress test.
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 --query \
$CLICKHOUSE_CLIENT --query \
"create database if not exists ${CLICKHOUSE_DATABASE}_repl_01111_$SUFFIX engine=Replicated('/test/01111/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX', '$SHARD', '$REPLICA')" \
2>&1| grep -Fa "Exception: " | grep -Fv "REPLICA_ALREADY_EXISTS" | grep -Fiv "Will not try to start it up" | \
grep -Fv "Coordination::Exception" | grep -Fv "already contains some data and it does not look like Replicated database path"

View File

@ -43,7 +43,6 @@ DROP DATABASE test_01148_atomic;
DROP TABLE rmt;
DROP TABLE rmt1;
SET allow_experimental_database_replicated=1;
DROP DATABASE IF EXISTS imdb_01148;
CREATE DATABASE imdb_01148 ENGINE = Replicated('/test/databases/imdb_01148', '{shard}', '{replica}');
CREATE TABLE imdb_01148.movie_directors (`director_id` UInt64, `movie_id` UInt64) ENGINE = ReplicatedMergeTree ORDER BY (director_id, movie_id) SETTINGS index_granularity = 8192;

View File

@ -34,5 +34,4 @@ test_db_comments "Ordinary"
test_db_comments "Lazy(1)"
# test_db_comments "MySQL('127.0.0.1:9004', 'default', 'default', '')" # fails due to CH internal reasons
# test_db_comments "SQLite('dummy_sqlitedb')"
## needs to be explicitly enabled with `SET allow_experimental_database_replicated=1`
# test_db_comments "Replicated('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX', '1') ORDER BY k"

View File

@ -10,7 +10,7 @@ ${CLICKHOUSE_CLIENT} -q "create table mute_stylecheck (x UInt32) engine = Replic
${CLICKHOUSE_CLIENT} -q "CREATE USER user_${CLICKHOUSE_DATABASE} settings database_replicated_allow_only_replicated_engine=1"
${CLICKHOUSE_CLIENT} -q "GRANT CREATE TABLE ON ${CLICKHOUSE_DATABASE}_db.* TO user_${CLICKHOUSE_DATABASE}"
${CLICKHOUSE_CLIENT} -q "GRANT TABLE ENGINE ON Memory, TABLE ENGINE ON MergeTree, TABLE ENGINE ON ReplicatedMergeTree TO user_${CLICKHOUSE_DATABASE}"
${CLICKHOUSE_CLIENT} --allow_experimental_database_replicated=1 --query "CREATE DATABASE ${CLICKHOUSE_DATABASE}_db engine = Replicated('/clickhouse/databases/${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}/${CLICKHOUSE_DATABASE}_db', '{shard}', '{replica}')"
${CLICKHOUSE_CLIENT} -q "CREATE DATABASE ${CLICKHOUSE_DATABASE}_db engine = Replicated('/clickhouse/databases/${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}/${CLICKHOUSE_DATABASE}_db', '{shard}', '{replica}')"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --user "user_${CLICKHOUSE_DATABASE}" --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.tab_memory (x UInt32) engine = Memory;"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --user "user_${CLICKHOUSE_DATABASE}" -n --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.tab_mt (x UInt32) engine = MergeTree order by x;" 2>&1 | grep -o "Only tables with a Replicated engine"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -n --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.tab_mt (x UInt32) engine = MergeTree order by x;"

View File

@ -21,7 +21,6 @@ select * from t_l5ydey order by c_qv5rv;
show create t_l5ydey;
-- Correct error code if creating database with the same path as table has
set allow_experimental_database_replicated=1;
create database local_t_l5ydey engine=Replicated('/clickhouse/tables/test_' || currentDatabase() || '/{shard}/local_t_l5ydey', '1', '1'); -- { serverError BAD_ARGUMENTS }
drop table local_t_l5ydey;

View File

@ -8,7 +8,7 @@ db="rdb_$CLICKHOUSE_DATABASE"
db2="${db}_2"
db3="${db}_3"
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 -q "create database $db engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1')"
$CLICKHOUSE_CLIENT -q "create database $db engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1')"
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -q "create table $db.mt (n int) engine=MergeTree order by tuple()"
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -q "create table $db.rmt (n int) engine=ReplicatedMergeTree order by tuple()"
@ -16,8 +16,8 @@ $CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -q "create table $db.rmt (
$CLICKHOUSE_CLIENT -q "insert into $db.rmt values (0), (1)"
$CLICKHOUSE_CLIENT -q "insert into $db.mt values (0), (1)"
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 -q "create database $db2 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r2')"
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 -q "create database $db3 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's2', 'r1')"
$CLICKHOUSE_CLIENT -q "create database $db2 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r2')"
$CLICKHOUSE_CLIENT -q "create database $db3 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's2', 'r1')"
$CLICKHOUSE_CLIENT -q "alter table $db.mt drop partition id 'all', add column m int" 2>&1| grep -Eo "not allowed to execute ALTERs of different types" | head -1
$CLICKHOUSE_CLIENT -q "alter table $db.rmt drop partition id 'all', add column m int" 2>&1| grep -Eo "not allowed to execute ALTERs of different types" | head -1

View File

@ -7,7 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
db="rdb_$CLICKHOUSE_DATABASE"
$CLICKHOUSE_CLIENT -q "system flush logs"
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 -q "create database $db engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1')"
$CLICKHOUSE_CLIENT -q "create database $db engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1')"
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -q "create table $db.t as system.query_log" # Suppress style check: current_database=$CLICKHOUSE_DATABASE
$CLICKHOUSE_CLIENT -q "show tables from $db"
@ -26,8 +26,8 @@ $CLICKHOUSE_CLIENT -q "system drop database replica 's2/r1' from zkpath '/test/$
db2="${db}_2"
db3="${db}_3"
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 -q "create database $db2 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r2')"
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 -q "create database $db3 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's2', 'r1')"
$CLICKHOUSE_CLIENT -q "create database $db2 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r2')"
$CLICKHOUSE_CLIENT -q "create database $db3 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's2', 'r1')"
$CLICKHOUSE_CLIENT -q "system sync database replica $db"
$CLICKHOUSE_CLIENT -q "select cluster, shard_num, replica_num, database_shard_name, database_replica_name, is_active from system.clusters where cluster='$db' and shard_num=1 and replica_num=1"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from database $db2" 2>&1| grep -Fac "is active, cannot drop it"
@ -56,7 +56,7 @@ $CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -q "create table $db.t2 as
$CLICKHOUSE_CLIENT -q "show tables from $db"
db4="${db}_4"
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 -q "create database $db4 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1')"
$CLICKHOUSE_CLIENT -q "create database $db4 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1')"
$CLICKHOUSE_CLIENT -q "system sync database replica $db4"
$CLICKHOUSE_CLIENT -q "select cluster, shard_num, replica_num, database_shard_name, database_replica_name, is_active from system.clusters where cluster='$db4'"

View File

@ -1,8 +1,6 @@
-- Tags: no-parallel
DROP DATABASE IF EXISTS replicated_database_test;
SET allow_experimental_database_replicated=1;
CREATE DATABASE IF NOT EXISTS replicated_database_test ENGINE = Replicated('some/path/' || currentDatabase() || '/replicated_database_test', 'shard_1', 'replica_1') SETTINGS max_broken_tables_ratio=1;
SELECT engine_full FROM system.databases WHERE name = 'replicated_database_test';
DROP DATABASE IF EXISTS replicated_database_test;
DROP DATABASE IF EXISTS replicated_database_test;

View File

@ -13,7 +13,7 @@ ${CLICKHOUSE_CLIENT} -q "create table mute_stylecheck (x UInt32) engine = Replic
${CLICKHOUSE_CLIENT} -q "CREATE USER user_${CLICKHOUSE_DATABASE} settings database_replicated_allow_replicated_engine_arguments=0"
${CLICKHOUSE_CLIENT} -q "GRANT CREATE TABLE ON ${CLICKHOUSE_DATABASE}_db.* TO user_${CLICKHOUSE_DATABASE}"
${CLICKHOUSE_CLIENT} -q "GRANT TABLE ENGINE ON ReplicatedMergeTree TO user_${CLICKHOUSE_DATABASE}"
${CLICKHOUSE_CLIENT} --allow_experimental_database_replicated=1 --query "CREATE DATABASE ${CLICKHOUSE_DATABASE}_db engine = Replicated('/clickhouse/databases/${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}/${CLICKHOUSE_DATABASE}_db', '{shard}', '{replica}')"
${CLICKHOUSE_CLIENT} -q "CREATE DATABASE ${CLICKHOUSE_DATABASE}_db engine = Replicated('/clickhouse/databases/${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}/${CLICKHOUSE_DATABASE}_db', '{shard}', '{replica}')"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --user "user_${CLICKHOUSE_DATABASE}" -n --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.tab_rmt_ok (x UInt32) engine = ReplicatedMergeTree order by x;"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --user "user_${CLICKHOUSE_DATABASE}" -n --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.tab_rmt_fail (x UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/root/{shard}', '{replica}') order by x; -- { serverError 80 }"
${CLICKHOUSE_CLIENT} --query "DROP DATABASE ${CLICKHOUSE_DATABASE}_db"

View File

@ -1,26 +1,38 @@
ALTER TABLE wrong_metadata RENAME COLUMN a TO a1, RENAME COLUMN b to b1 SETTINGS replication_alter_partitions_sync = 0;
SELECT * FROM wrong_metadata ORDER BY a1 FORMAT JSONEachRow;
{"a1":"1","b1":"2","c":"3"}
~~~~~~~
INSERT INTO wrong_metadata VALUES (4, 5, 6);
SELECT * FROM wrong_metadata ORDER BY a1 FORMAT JSONEachRow;
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
~~~~~~~
ALTER TABLE wrong_metadata RENAME COLUMN a1 TO b, RENAME COLUMN b1 to a SETTINGS replication_alter_partitions_sync = 0;
INSERT INTO wrong_metadata VALUES (7, 8, 9);
SELECT * FROM wrong_metadata ORDER by a1 FORMAT JSONEachRow;
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
{"a1":"7","b1":"8","c":"9"}
~~~~~~~
SYSTEM START MERGES wrong_metadata;
SYSTEM SYNC REPLICA wrong_metadata;
SELECT * FROM wrong_metadata order by a FORMAT JSONEachRow;
{"b":"1","a":"2","c":"3"}
{"b":"4","a":"5","c":"6"}
{"b":"7","a":"8","c":"9"}
~~~~~~~
ALTER TABLE wrong_metadata_compact RENAME COLUMN a TO a1, RENAME COLUMN b to b1 SETTINGS replication_alter_partitions_sync = 0;
SELECT * FROM wrong_metadata_compact ORDER BY a1 FORMAT JSONEachRow;
{"a1":"1","b1":"2","c":"3"}
~~~~~~~
INSERT INTO wrong_metadata_compact VALUES (4, 5, 6);
SELECT * FROM wrong_metadata_compact ORDER BY a1 FORMAT JSONEachRow;
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
~~~~~~~
ALTER TABLE wrong_metadata_compact RENAME COLUMN a1 TO b, RENAME COLUMN b1 to a SETTINGS replication_alter_partitions_sync = 0;
INSERT INTO wrong_metadata_compact VALUES (7, 8, 9);
SELECT * FROM wrong_metadata_compact ORDER by a1 FORMAT JSONEachRow;
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
{"a1":"7","b1":"8","c":"9"}
~~~~~~~
SYSTEM START MERGES wrong_metadata_compact;
SYSTEM SYNC REPLICA wrong_metadata_compact;
SELECT * FROM wrong_metadata_compact order by a FORMAT JSONEachRow;
{"b":"1","a":"2","c":"3"}
{"b":"4","a":"5","c":"6"}
{"b":"7","a":"8","c":"9"}
~~~~~~~

View File

@ -3,141 +3,101 @@
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./mergetree_mutations.lib
. "$CUR_DIR"/mergetree_mutations.lib
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS wrong_metadata"
function wait_column()
{
local table=$1 && shift
local column=$1 && shift
$CLICKHOUSE_CLIENT -n --query="CREATE TABLE wrong_metadata(
a UInt64,
b UInt64,
c UInt64
)
ENGINE ReplicatedMergeTree('/test/{database}/tables/wrong_metadata', '1')
ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 0"
for _ in {1..60}; do
result=$($CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE $table")
if [[ $result == *"$column"* ]]; then
return 0
fi
sleep 0.1
done
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata VALUES (1, 2, 3)"
echo "[$table] Cannot wait for column to appear" >&2
return 1
}
function wait_mutation_loaded()
{
local table=$1 && shift
local expr=$1 && shift
for _ in {1..60}; do
result=$($CLICKHOUSE_CLIENT --query "SELECT * FROM system.mutations WHERE table = '$table' AND database='$CLICKHOUSE_DATABASE'")
if [[ $result == *"$expr"* ]]; then
return 0
fi
sleep 0.1
done
$CLICKHOUSE_CLIENT --query="SYSTEM STOP MERGES wrong_metadata"
echo "[$table] Cannot wait mutation $expr" >&2
return 1
}
declare -A tables
tables["wrong_metadata"]="min_bytes_for_wide_part = 0"
tables["wrong_metadata_compact"]="min_bytes_for_wide_part = 10000000"
$CLICKHOUSE_CLIENT --query="ALTER TABLE wrong_metadata RENAME COLUMN a TO a1, RENAME COLUMN b to b1 SETTINGS replication_alter_partitions_sync = 0"
for table in "${!tables[@]}"; do
settings="${tables[$table]}"
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE wrong_metadata")
if [[ $result == *"\`a1\` UInt64"* ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT -n --query="
DROP TABLE IF EXISTS $table;
CREATE TABLE $table(
a UInt64,
b UInt64,
c UInt64
)
ENGINE ReplicatedMergeTree('/test/{database}/tables/$table', '1')
ORDER BY tuple()
SETTINGS $settings;
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata ORDER BY a1 FORMAT JSONEachRow"
INSERT INTO $table VALUES (1, 2, 3);
SYSTEM STOP MERGES $table;
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
-- { echoOn }
SELECT 'ECHO_ALIGNMENT_FIX' FORMAT Null;
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata VALUES (4, 5, 6)"
ALTER TABLE $table RENAME COLUMN a TO a1, RENAME COLUMN b to b1 SETTINGS replication_alter_partitions_sync = 0;
"
wait_column "$table" "\`a1\` UInt64" || exit 2
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata ORDER BY a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT -n --query="
-- { echoOn }
SELECT 'ECHO_ALIGNMENT_FIX' FORMAT Null;
SELECT * FROM $table ORDER BY a1 FORMAT JSONEachRow;
INSERT INTO $table VALUES (4, 5, 6);
SELECT * FROM $table ORDER BY a1 FORMAT JSONEachRow;
$CLICKHOUSE_CLIENT --query="ALTER TABLE wrong_metadata RENAME COLUMN a1 TO b, RENAME COLUMN b1 to a SETTINGS replication_alter_partitions_sync = 0"
ALTER TABLE $table RENAME COLUMN a1 TO b, RENAME COLUMN b1 to a SETTINGS replication_alter_partitions_sync = 0;
"
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SELECT * FROM system.mutations WHERE table = 'wrong_metadata' AND database='${CLICKHOUSE_DATABASE}'")
if [[ $result == *"b1 TO a"* ]]; then
break;
fi
sleep 0.1
((++counter))
done
wait_mutation_loaded "$table" "b1 TO a" || exit 2
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata VALUES (7, 8, 9)"
$CLICKHOUSE_CLIENT -n --query="
-- { echoOn }
SELECT 'ECHO_ALIGNMENT_FIX' FORMAT Null;
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata ORDER by a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
INSERT INTO $table VALUES (7, 8, 9);
SELECT * FROM $table ORDER by a1 FORMAT JSONEachRow;
SYSTEM START MERGES $table;
SYSTEM SYNC REPLICA $table;
"
$CLICKHOUSE_CLIENT --query="SYSTEM START MERGES wrong_metadata"
wait_for_all_mutations "$table"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA wrong_metadata"
$CLICKHOUSE_CLIENT -n --query="
-- { echoOn }
SELECT 'ECHO_ALIGNMENT_FIX' FORMAT Null;
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata order by a FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS wrong_metadata"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS wrong_metadata_compact"
$CLICKHOUSE_CLIENT -n --query="CREATE TABLE wrong_metadata_compact(
a UInt64,
b UInt64,
c UInt64
)
ENGINE ReplicatedMergeTree('/test/{database}/tables/wrong_metadata_compact', '1')
ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 10000000"
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata_compact VALUES (1, 2, 3)"
$CLICKHOUSE_CLIENT --query="SYSTEM STOP MERGES wrong_metadata_compact"
$CLICKHOUSE_CLIENT --query="ALTER TABLE wrong_metadata_compact RENAME COLUMN a TO a1, RENAME COLUMN b to b1 SETTINGS replication_alter_partitions_sync = 0"
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE wrong_metadata_compact")
if [[ $result == *"\`a1\` UInt64"* ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata_compact ORDER BY a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata_compact VALUES (4, 5, 6)"
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata_compact ORDER BY a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="ALTER TABLE wrong_metadata_compact RENAME COLUMN a1 TO b, RENAME COLUMN b1 to a SETTINGS replication_alter_partitions_sync = 0"
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SELECT * FROM system.mutations WHERE table = 'wrong_metadata_compact' AND database='${CLICKHOUSE_DATABASE}'")
if [[ $result == *"b1 TO a"* ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata_compact VALUES (7, 8, 9)"
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata_compact ORDER by a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="SYSTEM START MERGES wrong_metadata_compact"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA wrong_metadata_compact"
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata_compact order by a FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS wrong_metadata_compact"
SELECT * FROM $table order by a FORMAT JSONEachRow;
"
done |& grep -v -F -x -e '-- { echoOn }' -e " SELECT 'ECHO_ALIGNMENT_FIX' FORMAT Null;"

View File

@ -1,7 +1,5 @@
-- Tags: no-parallel
SET allow_experimental_database_replicated=1;
DROP DATABASE IF EXISTS replicated_database_params;
CREATE DATABASE replicated_database_params ENGINE = Replicated('some/path/' || currentDatabase() || '/replicated_database_params');

View File

@ -1,4 +1,3 @@
-- Tags: no-parallel
set allow_experimental_database_replicated=1;
create database replicated_db_no_args engine=Replicated; -- { serverError BAD_ARGUMENTS }

View File

@ -8,7 +8,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --allow_experimental_database_replicated=1 --query "CREATE DATABASE ${CLICKHOUSE_DATABASE}_db engine = Replicated('/clickhouse/databases/${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}/${CLICKHOUSE_DATABASE}_db', '{shard}', '{replica}')"
${CLICKHOUSE_CLIENT} --query "CREATE DATABASE ${CLICKHOUSE_DATABASE}_db engine = Replicated('/clickhouse/databases/${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}/${CLICKHOUSE_DATABASE}_db', '{shard}', '{replica}')"
# Non-replicated engines are allowed
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test (id UInt64) ENGINE = MergeTree() ORDER BY id AS SELECT 1"
# Replicated storafes are forbidden

View File

@ -1,29 +0,0 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
TABLE="03000_traverse_shadow_system_data_path_table"
BACKUP="03000_traverse_shadow_system_data_path_backup"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE ${TABLE} (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='s3_cache';"
${CLICKHOUSE_CLIENT} --query="INSERT INTO ${TABLE} VALUES (0, 'data');"
${CLICKHOUSE_CLIENT} --query "SELECT count() > 0 FROM system.remote_data_paths WHERE disk_name = 's3_cache'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE ${TABLE} FREEZE WITH NAME '${BACKUP}';"
${CLICKHOUSE_CLIENT} --query="DROP TABLE ${TABLE} SYNC;"
${CLICKHOUSE_CLIENT} --query "
SELECT count() > 0
FROM system.remote_data_paths
WHERE disk_name = 's3_cache' AND local_path LIKE '%shadow/${BACKUP}%'
SETTINGS traverse_shadow_remote_data_paths=1;"
${CLICKHOUSE_CLIENT} --query "SYSTEM UNFREEZE WITH NAME '${BACKUP}';" &>/dev/null || true

View File

@ -0,0 +1,20 @@
-- Tags: no-replicated-database, no-fasttest
DROP TABLE IF EXISTS 03000_traverse_shadow_system_data_path_table;
CREATE TABLE 03000_traverse_shadow_system_data_path_table (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='s3_cache';
INSERT INTO 03000_traverse_shadow_system_data_path_table VALUES (0, 'data');
ALTER TABLE 03000_traverse_shadow_system_data_path_table FREEZE WITH NAME '03000_traverse_shadow_system_data_path_table_backup';
SELECT count() > 0
FROM system.remote_data_paths
WHERE disk_name = 's3_cache' AND local_path LIKE '%shadow/03000_traverse_shadow_system_data_path_table_backup%'
SETTINGS traverse_shadow_remote_data_paths=1;
DROP TABLE IF EXISTS 03000_traverse_shadow_system_data_path_table;

View File

@ -0,0 +1,4 @@
1
1
2
2

View File

@ -0,0 +1,13 @@
CREATE TABLE users (uid Int16, name Nullable(String), age Int16) ENGINE=Memory;
INSERT INTO users VALUES (1231, 'John', 33);
INSERT INTO users VALUES (6666, Null, 48);
INSERT INTO users VALUES (8888, 'Alice', 50);
SELECT count(name) FILTER (WHERE uid > 2000) FROM users;
SELECT countIf(name, uid > 2000) FROM users;
SELECT count(*) FILTER (WHERE uid > 2000) FROM users;
SELECT countIf(uid > 2000) FROM users;
DROP TABLE users;

View File

@ -0,0 +1,10 @@
-- { echoOn }
SELECT shardNum(), count() FROM dt WHERE (tag_id, tag_name) IN ((1, 'foo1'), (1, 'foo2')) GROUP BY 1 ORDER BY 1;
1 2
2 2
SELECT shardNum(), count() FROM dt WHERE tag_id IN (1, 1) AND tag_name IN ('foo1', 'foo2') GROUP BY 1 ORDER BY 1;
1 2
2 2
SELECT shardNum(), count() FROM dt WHERE tag_id = 1 AND tag_name IN ('foo1', 'foo2') GROUP BY 1 ORDER BY 1;
1 2
2 2

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS t;
DROP TABLE IF EXISTS dt;
CREATE TABLE t (tag_id UInt64, tag_name String) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE dt AS t ENGINE = Distributed('test_cluster_two_shards_localhost', currentDatabase(), 't', cityHash64(concat(tag_id, tag_name)));
INSERT INTO dt SETTINGS distributed_foreground_insert=1 VALUES (1, 'foo1'); -- shard0
INSERT INTO dt SETTINGS distributed_foreground_insert=1 VALUES (1, 'foo2'); -- shard1
SET optimize_skip_unused_shards=1, optimize_skip_unused_shards_rewrite_in=1;
-- { echoOn }
SELECT shardNum(), count() FROM dt WHERE (tag_id, tag_name) IN ((1, 'foo1'), (1, 'foo2')) GROUP BY 1 ORDER BY 1;
SELECT shardNum(), count() FROM dt WHERE tag_id IN (1, 1) AND tag_name IN ('foo1', 'foo2') GROUP BY 1 ORDER BY 1;
SELECT shardNum(), count() FROM dt WHERE tag_id = 1 AND tag_name IN ('foo1', 'foo2') GROUP BY 1 ORDER BY 1;

View File

@ -0,0 +1,17 @@
-- Tags: long
CREATE TABLE test_gcd(test_col UInt32 CODEC(GCD, LZ4))
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS index_granularity = 8192, index_granularity_bytes = 1024;
INSERT INTO test_gcd SELECT floor(randUniform(1, 3)) FROM numbers(150000);
OPTIMIZE TABLE test_gcd FINAL;
CREATE TABLE test_gcd2(test_col UInt32 CODEC(GCD, LZ4))
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS index_granularity = 8192, index_granularity_bytes = 1024, min_bytes_for_wide_part = 0, max_compress_block_size = 1024, min_compress_block_size = 1024;
INSERT INTO test_gcd2 SELECT floor(randUniform(1, 3)) FROM numbers(150000);
OPTIMIZE TABLE test_gcd2 FINAL;

View File

@ -0,0 +1,2 @@
278926179
278926179

View File

@ -0,0 +1,30 @@
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS t0;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t3;
DROP TABLE IF EXISTS t4;
CREATE TABLE t0 (c0 String) ENGINE = Memory() ;
CREATE TABLE IF NOT EXISTS t1 (c0 Int32, c1 Int32, c2 ALIAS c1) ENGINE = Log() ;
CREATE TABLE t2 (c0 Int32) ENGINE = MergeTree() ORDER BY tuple() ;
CREATE TABLE t3 (c0 String) ENGINE = Memory() ;
CREATE TABLE t4 (c0 Int32) ENGINE = Memory() ;
INSERT INTO t4(c0) VALUES (-405831124);
INSERT INTO t1(c1, c0) VALUES (278926179, 891140511);
INSERT INTO t4(c0) VALUES (1586457527);
INSERT INTO t3(c0) VALUES ('?/|D!6 '), ('1586457527');
INSERT INTO t2(c0) VALUES (1475250982);
SELECT t1.c1
FROM t3, t1
WHERE true AND t1.c2
UNION ALL
SELECT t1.c1
FROM t3, t1
WHERE NOT t1.c2
UNION ALL
SELECT t1.c1
FROM t3, t1
WHERE t1.c2 IS NULL;

View File

@ -0,0 +1,2 @@
OK
OK

View File

@ -0,0 +1,43 @@
#!/usr/bin/env bash
# Checks that "clickhouse-client/local --help" prints a brief summary of CLI arguments and "--help --verbose" prints all possible CLI arguments
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Unique identifier for concurrent execution
PID=$$
# Get the help message in short and verbose form and put them into txt files
$CLICKHOUSE_CLIENT --help > "help_msg_$PID.txt"
$CLICKHOUSE_CLIENT --help --verbose > "verbose_help_msg_$PID.txt"
# Sizes of files
size_short=$(stat -c %s "help_msg_$PID.txt")
size_verbose=$(stat -c %s "verbose_help_msg_$PID.txt")
# If the size of the short help message is less, everything is OK
if [ $size_short -lt $size_verbose ]; then
echo "OK"
else
echo "Not OK"
fi
rm "help_msg_$PID.txt"
rm "verbose_help_msg_$PID.txt"
# The same for clickhouse local
$CLICKHOUSE_LOCAL --help > "help_msg_$PID.txt"
$CLICKHOUSE_LOCAL --help --verbose > "verbose_help_msg_$PID.txt"
size_short=$(stat -c %s "help_msg_$PID.txt")
size_verbose=$(stat -c %s "verbose_help_msg_$PID.txt")
if [ $size_short -lt $size_verbose ]; then
echo "OK"
else
echo "Not OK"
fi
rm "help_msg_$PID.txt"
rm "verbose_help_msg_$PID.txt"

View File

@ -29,7 +29,6 @@ function wait_for_all_mutations()
for i in {1..200}
do
sleep 1
if [[ $(${CLICKHOUSE_CLIENT} --query="SELECT coalesce(minOrNull(is_done), 1) FROM system.mutations WHERE database='$database' AND table like '$table'") -eq 1 ]]; then
break
fi
@ -38,6 +37,7 @@ function wait_for_all_mutations()
echo "Timed out while waiting for mutation to execute!"
fi
sleep 0.1
done
}

Some files were not shown because too many files have changed in this diff Show More