Merge branch 'master' into merging_53304

This commit is contained in:
Alexander Tokmakov 2023-08-16 13:13:30 +02:00
commit 1dcdb7ee46
26 changed files with 336 additions and 231 deletions

View File

@ -12,6 +12,7 @@ ENV \
# install systemd packages
RUN apt-get update && \
apt-get install -y --no-install-recommends \
sudo \
systemd \
&& \
apt-get clean && \

View File

@ -776,8 +776,12 @@ namespace
UInt64 key = 0;
auto * dst = reinterpret_cast<char *>(&key);
const auto ref = cache.from_column->getDataAt(i);
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunreachable-code"
if constexpr (std::endian::native == std::endian::big)
dst += sizeof(key) - ref.size;
#pragma clang diagnostic pop
memcpy(dst, ref.data, ref.size);
table[key] = i;

View File

@ -1780,7 +1780,8 @@ void IMergeTreeDataPart::renameToDetached(const String & prefix)
part_is_probably_removed_from_disk = true;
}
DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/) const
DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/,
const DiskTransactionPtr & disk_transaction) const
{
/// Avoid unneeded duplicates of broken parts if we try to detach the same broken part multiple times.
/// Otherwise it may pollute detached/ with dirs with _tryN suffix and we will fail to remove broken part after 10 attempts.
@ -1795,7 +1796,8 @@ DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix
IDataPartStorage::ClonePartParams params
{
.copy_instead_of_hardlink = isStoredOnRemoteDiskWithZeroCopySupport() && storage.supportsReplication() && storage_settings->allow_remote_fs_zero_copy_replication,
.make_source_readonly = true
.make_source_readonly = true,
.external_transaction = disk_transaction
};
return getDataPartStorage().freeze(
storage.relative_data_path,

View File

@ -371,7 +371,8 @@ public:
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists);
/// Makes clone of a part in detached/ directory via hard links
virtual DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const;
virtual DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const;
/// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk
MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const;

View File

@ -2619,8 +2619,50 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
if (detached_parts.empty())
return 0;
PartsTemporaryRename renamed_parts(*this, "detached/");
auto get_last_touched_time = [&](const DetachedPartInfo & part_info) -> time_t
{
auto path = fs::path(relative_data_path) / "detached" / part_info.dir_name;
time_t last_change_time = part_info.disk->getLastChanged(path);
time_t last_modification_time = part_info.disk->getLastModified(path).epochTime();
return std::max(last_change_time, last_modification_time);
};
time_t ttl_seconds = getSettings()->merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds;
size_t unfinished_deleting_parts = 0;
time_t current_time = time(nullptr);
for (const auto & part_info : detached_parts)
{
if (!part_info.dir_name.starts_with("deleting_"))
continue;
time_t startup_time = current_time - static_cast<time_t>(Context::getGlobalContextInstance()->getUptimeSeconds());
time_t last_touch_time = get_last_touched_time(part_info);
/// Maybe it's being deleted right now (for example, in ALTER DROP DETACHED)
bool had_restart = last_touch_time < startup_time;
bool ttl_expired = last_touch_time + ttl_seconds <= current_time;
if (!had_restart && !ttl_expired)
continue;
/// We were trying to delete this detached part but did not finish deleting, probably because the server crashed
LOG_INFO(log, "Removing detached part {} that we failed to remove previously", part_info.dir_name);
try
{
removeDetachedPart(part_info.disk, fs::path(relative_data_path) / "detached" / part_info.dir_name / "", part_info.dir_name);
++unfinished_deleting_parts;
}
catch (...)
{
tryLogCurrentException(log);
}
}
if (!getSettings()->merge_tree_enable_clear_old_broken_detached)
return unfinished_deleting_parts;
const auto full_path = fs::path(relative_data_path) / "detached";
size_t removed_count = 0;
for (const auto & part_info : detached_parts)
{
if (!part_info.valid_name || part_info.prefix.empty())
@ -2635,31 +2677,24 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
if (!can_be_removed_by_timeout)
continue;
time_t current_time = time(nullptr);
ssize_t threshold = current_time - getSettings()->merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds;
auto path = fs::path(relative_data_path) / "detached" / part_info.dir_name;
time_t last_change_time = part_info.disk->getLastChanged(path);
time_t last_modification_time = part_info.disk->getLastModified(path).epochTime();
time_t last_touch_time = std::max(last_change_time, last_modification_time);
ssize_t threshold = current_time - ttl_seconds;
time_t last_touch_time = get_last_touched_time(part_info);
if (last_touch_time == 0 || last_touch_time >= threshold)
continue;
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name, part_info.disk);
}
const String & old_name = part_info.dir_name;
String new_name = "deleting_" + part_info.dir_name;
part_info.disk->moveFile(fs::path(full_path) / old_name, fs::path(full_path) / new_name);
LOG_INFO(log, "Will clean up {} detached parts", renamed_parts.old_and_new_names.size());
renamed_parts.tryRenameAll();
for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
removeDetachedPart(part_info.disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_WARNING(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name);
old_name.clear();
++removed_count;
}
return renamed_parts.old_and_new_names.size();
LOG_INFO(log, "Cleaned up {} detached parts", removed_count);
return removed_count + unfinished_deleting_parts;
}
size_t MergeTreeData::clearOldWriteAheadLogs()
@ -4035,7 +4070,7 @@ void MergeTreeData::restoreAndActivatePart(const DataPartPtr & part, DataPartsLo
void MergeTreeData::outdateUnexpectedPartAndCloneToDetached(const DataPartPtr & part_to_detach)
{
LOG_INFO(log, "Cloning part {} to unexpected_{} and making it obsolete.", part_to_detach->getDataPartStorage().getPartDirectory(), part_to_detach->name);
part_to_detach->makeCloneInDetached("unexpected", getInMemoryMetadataPtr());
part_to_detach->makeCloneInDetached("unexpected", getInMemoryMetadataPtr(), /*disk_transaction*/ {});
DataPartsLock lock = lockParts();
part_to_detach->is_unexpected_local_part = true;
@ -5797,18 +5832,21 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
{
const String source_dir = "detached/";
std::map<String, DiskPtr> name_to_disk;
/// Let's compose a list of parts that should be added.
if (attach_part)
{
const String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_id);
auto disk = getDiskForDetachedPart(part_id);
renamed_parts.addPart(part_id, "attaching_" + part_id, disk);
if (MergeTreePartInfo::tryParsePartName(part_id, format_version))
name_to_disk[part_id] = getDiskForDetachedPart(part_id);
if (temporary_parts.contains(String(DETACHED_DIR_NAME) + "/" + part_id))
{
LOG_WARNING(log, "Will not try to attach part {} because its directory is temporary, "
"probably it's being detached right now", part_id);
}
else
{
auto disk = getDiskForDetachedPart(part_id);
renamed_parts.addPart(part_id, "attaching_" + part_id, disk);
}
}
else
{
@ -5825,6 +5863,12 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_info : detached_parts)
{
if (temporary_parts.contains(String(DETACHED_DIR_NAME) + "/" + part_info.dir_name))
{
LOG_WARNING(log, "Will not try to attach part {} because its directory is temporary, "
"probably it's being detached right now", part_info.dir_name);
continue;
}
LOG_DEBUG(log, "Found part {}", part_info.dir_name);
active_parts.add(part_info.dir_name);
}
@ -5835,6 +5879,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_info : detached_parts)
{
const String containing_part = active_parts.getContainingPart(part_info.dir_name);
if (containing_part.empty())
continue;
LOG_DEBUG(log, "Found containing part {} for part {}", containing_part, part_info.dir_name);

View File

@ -17,6 +17,7 @@ namespace DB
namespace ErrorCodes
{
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int NOT_IMPLEMENTED;
}
MergeTreeDataPartInMemory::MergeTreeDataPartInMemory(
@ -138,8 +139,12 @@ MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &
return new_data_part_storage;
}
DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const
DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix,
const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const
{
if (disk_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "InMemory parts are not compatible with disk transactions");
String detached_path = *getRelativePathForDetachedPart(prefix, /* broken */ false);
return flushToDisk(detached_path, metadata_snapshot);
}

View File

@ -42,7 +42,8 @@ public:
bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column.getNameInStorage()); }
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) override;
DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const override;
DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const override;
std::optional<time_t> getColumnModificationTime(const String & /* column_name */) const override { return {}; }
MutableDataPartStoragePtr flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const;

View File

@ -149,8 +149,7 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate()
/// do it under share lock
cleaned_other += storage.clearOldWriteAheadLogs();
cleaned_part_like += storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
cleaned_part_like += storage.clearOldBrokenPartsFromDetachedDirectory();
cleaned_part_like += storage.clearOldBrokenPartsFromDetachedDirectory();
}
/// This is loose condition: no problem if we actually had lost leadership at this moment

View File

@ -633,8 +633,8 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
delayed_chunk.reset();
}
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
template<>
bool ReplicatedMergeTreeSinkImpl<false>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
{
/// NOTE: No delay in this case. That's Ok.
auto origin_zookeeper = storage.getZooKeeper();
@ -649,8 +649,13 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::writeExistingPart(MergeTreeData:
try
{
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
commitPart(zookeeper, part, BlockIDsType(), replicas_num, true);
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()));
String block_id = deduplicate ? fmt::format("{}_{}", part->info.partition_id, part->checksums.getTotalChecksumHex()) : "";
bool deduplicated = commitPart(zookeeper, part, block_id, replicas_num, /* writing_existing_part */ true).second;
/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()), ExecutionStatus(error));
return deduplicated;
}
catch (...)
{

View File

@ -56,7 +56,7 @@ public:
String getName() const override { return "ReplicatedMergeTreeSink"; }
/// For ATTACHing existing data on filesystem.
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
bool writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
/// For proper deduplication in MaterializedViews
bool lastBlockIsDuplicate() const override

View File

@ -1379,8 +1379,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
cleared_count += clearOldWriteAheadLogs();
cleared_count += clearOldMutations();
cleared_count += clearEmptyParts();
if (getSettings()->merge_tree_enable_clear_old_broken_detached)
cleared_count += clearOldBrokenPartsFromDetachedDirectory();
cleared_count += clearOldBrokenPartsFromDetachedDirectory();
return cleared_count;
/// TODO maybe take into account number of cleared objects when calculating backoff
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
@ -1811,8 +1810,10 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
if (detach)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
String part_dir = part->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
{
@ -1894,8 +1895,10 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
for (const auto & part : parts)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
String part_dir = part->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
}
@ -1935,8 +1938,10 @@ void StorageMergeTree::dropPartsImpl(DataPartsVector && parts_to_remove, bool de
/// NOTE: no race with background cleanup until we hold pointers to parts
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
String part_dir = part->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
}

View File

@ -2097,8 +2097,10 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
{
if (auto part_to_detach = part.getPartIfItWasActive())
{
LOG_INFO(log, "Detaching {}", part_to_detach->getDataPartStorage().getPartDirectory());
part_to_detach->makeCloneInDetached("", metadata_snapshot);
String part_dir = part_to_detach->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part_to_detach->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
}
}
@ -2828,7 +2830,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
for (const auto & part : parts_to_remove_from_working_set)
{
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("clone", metadata_snapshot);
part->makeCloneInDetached("clone", metadata_snapshot, /*disk_transaction*/ {});
}
}
@ -3794,12 +3796,12 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
chassert(!broken_part);
chassert(!storage_init);
part->was_removed_as_broken = true;
part->makeCloneInDetached("broken", getInMemoryMetadataPtr());
part->makeCloneInDetached("broken", getInMemoryMetadataPtr(), /*disk_transaction*/ {});
broken_part = part;
}
else
{
part->makeCloneInDetached("covered-by-broken", getInMemoryMetadataPtr());
part->makeCloneInDetached("covered-by-broken", getInMemoryMetadataPtr(), /*disk_transaction*/ {});
}
detached_parts.push_back(part->name);
}
@ -6133,8 +6135,9 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
/// TODO Allow to use quorum here.
ReplicatedMergeTreeSink output(*this, metadata_snapshot, 0, 0, 0, false, false, false, query_context,
/*is_attach*/true);
ReplicatedMergeTreeSink output(*this, metadata_snapshot, /* quorum */ 0, /* quorum_timeout_ms */ 0, /* max_parts_per_block */ 0,
/* quorum_parallel */ false, query_context->getSettingsRef().insert_deduplicate,
/* majority_quorum */ false, query_context, /*is_attach*/true);
for (size_t i = 0; i < loaded_parts.size(); ++i)
{

View File

@ -5,24 +5,6 @@ test_distributed_ddl/test.py::test_default_database[configs_secure]
test_distributed_ddl/test.py::test_on_server_fail[configs]
test_distributed_ddl/test.py::test_on_server_fail[configs_secure]
test_distributed_insert_backward_compatibility/test.py::test_distributed_in_tuple
test_distributed_inter_server_secret/test.py::test_per_user_inline_settings_secure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_per_user_inline_settings_secure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_per_user_inline_settings_secure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_per_user_protocol_settings_secure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_per_user_protocol_settings_secure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_per_user_protocol_settings_secure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_insecure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_user_insecure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_user_insecure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_from_backward[default-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_from_backward[nopass-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_from_backward[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_with_backward[default-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_with_backward[nopass-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_with_backward[pass-foo]
test_distributed_load_balancing/test.py::test_distributed_replica_max_ignored_errors
test_distributed_load_balancing/test.py::test_load_balancing_default
test_distributed_load_balancing/test.py::test_load_balancing_priority_round_robin[dist_priority]

View File

@ -145,10 +145,12 @@ def main():
ci_logs_password = os.getenv(
"CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD"
)
subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}' '{main_log_path}'",
shell=True,
)
if ci_logs_host not in ("CLICKHOUSE_CI_LOGS_HOST", ""):
subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}' '{main_log_path}'",
shell=True,
)
check_name_lower = (
check_name.lower().replace("(", "").replace(")", "").replace(" ", "")

View File

@ -394,10 +394,11 @@ def main():
ci_logs_password = os.getenv(
"CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD"
)
subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}'",
shell=True,
)
if ci_logs_host not in ("CLICKHOUSE_CI_LOGS_HOST", ""):
subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}'",
shell=True,
)
report_url = upload_results(
s3_helper,

View File

@ -50,8 +50,19 @@ def prepare_test_scripts():
server_test = r"""#!/bin/bash
set -e
trap "bash -ex /packages/preserve_logs.sh" ERR
test_env='TEST_THE_DEFAULT_PARAMETER=15'
echo "$test_env" >> /etc/default/clickhouse
systemctl start clickhouse-server
clickhouse-client -q 'SELECT version()'"""
clickhouse-client -q 'SELECT version()'
grep "$test_env" /proc/$(cat /var/run/clickhouse-server/clickhouse-server.pid)/environ"""
initd_test = r"""#!/bin/bash
set -e
trap "bash -ex /packages/preserve_logs.sh" ERR
test_env='TEST_THE_DEFAULT_PARAMETER=15'
echo "$test_env" >> /etc/default/clickhouse
/etc/init.d/clickhouse-server start
clickhouse-client -q 'SELECT version()'
grep "$test_env" /proc/$(cat /var/run/clickhouse-server/clickhouse-server.pid)/environ"""
keeper_test = r"""#!/bin/bash
set -e
trap "bash -ex /packages/preserve_logs.sh" ERR
@ -102,6 +113,7 @@ chmod a+rw -R /tests_logs
exit 1
"""
(TEMP_PATH / "server_test.sh").write_text(server_test, encoding="utf-8")
(TEMP_PATH / "initd_test.sh").write_text(initd_test, encoding="utf-8")
(TEMP_PATH / "keeper_test.sh").write_text(keeper_test, encoding="utf-8")
(TEMP_PATH / "binary_test.sh").write_text(binary_test, encoding="utf-8")
(TEMP_PATH / "preserve_logs.sh").write_text(preserve_logs, encoding="utf-8")
@ -112,6 +124,9 @@ def test_install_deb(image: DockerImage) -> TestResults:
"Install server deb": r"""#!/bin/bash -ex
apt-get install /packages/clickhouse-{server,client,common}*deb
bash -ex /packages/server_test.sh""",
"Run server init.d": r"""#!/bin/bash -ex
apt-get install /packages/clickhouse-{server,client,common}*deb
bash -ex /packages/initd_test.sh""",
"Install keeper deb": r"""#!/bin/bash -ex
apt-get install /packages/clickhouse-keeper*deb
bash -ex /packages/keeper_test.sh""",
@ -191,6 +206,9 @@ def test_install(image: DockerImage, tests: Dict[str, str]) -> TestResults:
retcode = process.wait()
if retcode == 0:
status = OK
subprocess.check_call(
f"docker kill -s 9 {container_id}", shell=True
)
break
status = FAIL
@ -198,8 +216,8 @@ def test_install(image: DockerImage, tests: Dict[str, str]) -> TestResults:
archive_path = TEMP_PATH / f"{container_name}-{retry}.tar.gz"
compress_fast(LOGS_PATH, archive_path)
logs.append(archive_path)
subprocess.check_call(f"docker kill -s 9 {container_id}", shell=True)
subprocess.check_call(f"docker kill -s 9 {container_id}", shell=True)
test_results.append(TestResult(name, status, stopwatch.duration_seconds, logs))
return test_results
@ -276,7 +294,7 @@ def main():
sys.exit(0)
docker_images = {
name: get_image_with_version(REPORTS_PATH, name)
name: get_image_with_version(REPORTS_PATH, name, args.download)
for name in (RPM_IMAGE, DEB_IMAGE)
}
prepare_test_scripts()
@ -293,6 +311,8 @@ def main():
is_match = is_match or path.endswith(".rpm")
if args.tgz:
is_match = is_match or path.endswith(".tgz")
# We don't need debug packages, so let's filter them out
is_match = is_match and "-dbg" not in path
return is_match
download_builds_filter(

View File

@ -209,10 +209,11 @@ def run_stress_test(docker_image_name):
ci_logs_password = os.getenv(
"CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD"
)
subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}'",
shell=True,
)
if ci_logs_host not in ("CLICKHOUSE_CI_LOGS_HOST", ""):
subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}'",
shell=True,
)
report_url = upload_results(
s3_helper,

View File

@ -57,27 +57,30 @@ def remove_broken_detached_part_impl(table, node, expect_broken_prefix):
]
)
node.exec_in_container(["mkdir", f"{path_to_detached}../unexpected_all_42_1337_5"])
node.exec_in_container(
[
"touch",
"-t",
"1312031429.30",
f"{path_to_detached}../unexpected_all_42_1337_5",
]
)
result = node.exec_in_container(
["stat", f"{path_to_detached}../unexpected_all_42_1337_5"]
)
print(result)
assert "Modify: 2013-12-03" in result
node.exec_in_container(
[
"mv",
f"{path_to_detached}../unexpected_all_42_1337_5",
f"{path_to_detached}unexpected_all_42_1337_5",
]
)
for name in [
"unexpected_all_42_1337_5",
"deleting_all_123_456_7",
"covered-by-broken_all_12_34_5",
]:
node.exec_in_container(["mkdir", f"{path_to_detached}../{name}"])
node.exec_in_container(
[
"touch",
"-t",
"1312031429.30",
f"{path_to_detached}../{name}",
]
)
result = node.exec_in_container(["stat", f"{path_to_detached}../{name}"])
print(result)
assert "Modify: 2013-12-03" in result
node.exec_in_container(
[
"mv",
f"{path_to_detached}../{name}",
f"{path_to_detached}{name}",
]
)
result = node.query(
f"CHECK TABLE {table}", settings={"check_query_single_value_result": 0}
@ -87,17 +90,20 @@ def remove_broken_detached_part_impl(table, node, expect_broken_prefix):
node.query(f"DETACH TABLE {table}")
node.query(f"ATTACH TABLE {table}")
result = node.exec_in_container(["ls", path_to_detached])
print(result)
assert f"{expect_broken_prefix}_all_3_3_0" in result
assert "all_1_1_0" in result
assert "trash" in result
assert "broken_all_fake" in result
assert "unexpected_all_42_1337_5" in result
time.sleep(15)
assert node.contains_in_log(
"Removed broken detached part unexpected_all_42_1337_5 due to a timeout"
node.wait_for_log_line(
"Removing detached part deleting_all_123_456_7",
timeout=90,
look_behind_lines=1000000,
)
node.wait_for_log_line(
f"Removed broken detached part {expect_broken_prefix}_all_3_3_0 due to a timeout",
timeout=10,
look_behind_lines=1000000,
)
node.wait_for_log_line(
"Removed broken detached part unexpected_all_42_1337_5 due to a timeout",
timeout=10,
look_behind_lines=1000000,
)
result = node.exec_in_container(["ls", path_to_detached])
@ -106,7 +112,16 @@ def remove_broken_detached_part_impl(table, node, expect_broken_prefix):
assert "all_1_1_0" in result
assert "trash" in result
assert "broken_all_fake" in result
assert "covered-by-broken_all_12_34_5" in result
assert "unexpected_all_42_1337_5" not in result
assert "deleting_all_123_456_7" not in result
node.query(
f"ALTER TABLE {table} DROP DETACHED PART 'covered-by-broken_all_12_34_5'",
settings={"allow_drop_detached": 1},
)
result = node.exec_in_container(["ls", path_to_detached])
assert "covered-by-broken_all_12_34_5" not in result
node.query(f"DROP TABLE {table} SYNC")

View File

@ -110,10 +110,6 @@ def start_cluster():
cluster.shutdown()
def query_with_id(node, id_, query, **kwargs):
return node.query("WITH '{}' AS __id {}".format(id_, query), **kwargs)
# @return -- [user, initial_user]
def get_query_user_info(node, query_pattern):
node.query("SYSTEM FLUSH LOGS")
@ -334,7 +330,7 @@ def test_secure_disagree_insert():
@users
def test_user_insecure_cluster(user, password):
id_ = "query-dist_insecure-" + user
query_with_id(n1, id_, "SELECT * FROM dist_insecure", user=user, password=password)
n1.query(f"SELECT *, '{id_}' FROM dist_insecure", user=user, password=password)
assert get_query_user_info(n1, id_) == [
user,
user,
@ -345,7 +341,7 @@ def test_user_insecure_cluster(user, password):
@users
def test_user_secure_cluster(user, password):
id_ = "query-dist_secure-" + user
query_with_id(n1, id_, "SELECT * FROM dist_secure", user=user, password=password)
n1.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(n2, id_) == [user, user]
@ -353,16 +349,14 @@ def test_user_secure_cluster(user, password):
@users
def test_per_user_inline_settings_insecure_cluster(user, password):
id_ = "query-ddl-settings-dist_insecure-" + user
query_with_id(
n1,
id_,
"""
SELECT * FROM dist_insecure
SETTINGS
prefer_localhost_replica=0,
max_memory_usage_for_user=1e9,
max_untracked_memory=0
""",
n1.query(
f"""
SELECT *, '{id_}' FROM dist_insecure
SETTINGS
prefer_localhost_replica=0,
max_memory_usage_for_user=1e9,
max_untracked_memory=0
""",
user=user,
password=password,
)
@ -372,16 +366,14 @@ def test_per_user_inline_settings_insecure_cluster(user, password):
@users
def test_per_user_inline_settings_secure_cluster(user, password):
id_ = "query-ddl-settings-dist_secure-" + user
query_with_id(
n1,
id_,
"""
SELECT * FROM dist_secure
SETTINGS
prefer_localhost_replica=0,
max_memory_usage_for_user=1e9,
max_untracked_memory=0
""",
n1.query(
f"""
SELECT *, '{id_}' FROM dist_secure
SETTINGS
prefer_localhost_replica=0,
max_memory_usage_for_user=1e9,
max_untracked_memory=0
""",
user=user,
password=password,
)
@ -393,10 +385,8 @@ def test_per_user_inline_settings_secure_cluster(user, password):
@users
def test_per_user_protocol_settings_insecure_cluster(user, password):
id_ = "query-protocol-settings-dist_insecure-" + user
query_with_id(
n1,
id_,
"SELECT * FROM dist_insecure",
n1.query(
f"SELECT *, '{id_}' FROM dist_insecure",
user=user,
password=password,
settings={
@ -411,10 +401,8 @@ def test_per_user_protocol_settings_insecure_cluster(user, password):
@users
def test_per_user_protocol_settings_secure_cluster(user, password):
id_ = "query-protocol-settings-dist_secure-" + user
query_with_id(
n1,
id_,
"SELECT * FROM dist_secure",
n1.query(
f"SELECT *, '{id_}' FROM dist_secure",
user=user,
password=password,
settings={
@ -431,8 +419,8 @@ def test_per_user_protocol_settings_secure_cluster(user, password):
@users
def test_user_secure_cluster_with_backward(user, password):
id_ = "with-backward-query-dist_secure-" + user
query_with_id(
n1, id_, "SELECT * FROM dist_secure_backward", user=user, password=password
n1.query(
f"SELECT *, '{id_}' FROM dist_secure_backward", user=user, password=password
)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]
@ -441,13 +429,7 @@ def test_user_secure_cluster_with_backward(user, password):
@users
def test_user_secure_cluster_from_backward(user, password):
id_ = "from-backward-query-dist_secure-" + user
query_with_id(
backward,
id_,
"SELECT * FROM dist_secure_backward",
user=user,
password=password,
)
backward.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]

View File

@ -11,26 +11,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function query_with_retry
{
local query="$1" && shift
local retry=0
until [ $retry -ge 5 ]
do
local result
result="$($CLICKHOUSE_CLIENT "$@" --query="$query" 2>&1)"
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$((retry + 1))
sleep 3
fi
done
echo "Query '$query' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r1;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r2;"

View File

@ -5,22 +5,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=$($CLICKHOUSE_CLIENT $2 --query="$1" 2>&1)
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS ttl_repl1"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS ttl_repl2"

View File

@ -7,23 +7,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=$($CLICKHOUSE_CLIENT $2 --query="$1" 2>&1)
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst;"

View File

@ -0,0 +1,4 @@
default begin inserts
default end inserts
20 210
20 210

View File

@ -0,0 +1,76 @@
#!/usr/bin/env bash
# Tags: race, zookeeper, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./replication.lib
. "$CURDIR"/replication.lib
$CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS alter_table0;
DROP TABLE IF EXISTS alter_table1;
CREATE TABLE alter_table0 (a UInt8, b Int16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a;
CREATE TABLE alter_table1 (a UInt8, b Int16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a;
" || exit 1
function thread_detach()
{
while true; do
$CLICKHOUSE_CLIENT -mn -q "ALTER TABLE alter_table$(($RANDOM % 2)) DETACH PARTITION ID 'all'; SELECT sleep($RANDOM / 32000) format Null;" 2>/dev/null ||:
done
}
function thread_attach()
{
while true; do
$CLICKHOUSE_CLIENT -mn -q "ALTER TABLE alter_table$(($RANDOM % 2)) ATTACH PARTITION ID 'all'; SELECT sleep($RANDOM / 32000) format Null;" 2>/dev/null ||:
done
}
function insert()
{
$CLICKHOUSE_CLIENT -q "INSERT INTO alter_table$(($RANDOM % 2)) SELECT $RANDOM, $i" 2>/dev/null
}
thread_detach & PID_1=$!
thread_attach & PID_2=$!
thread_detach & PID_3=$!
thread_attach & PID_4=$!
function do_inserts()
{
for i in {1..20}; do
while ! insert; do $CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'retrying insert $i' FORMAT Null"; done
done
}
$CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'begin inserts'"
do_inserts 2>&1| grep -Fa "Exception: " | grep -Fv "was cancelled by concurrent ALTER PARTITION"
$CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'end inserts'"
kill -TERM $PID_1 && kill -TERM $PID_2 && kill -TERM $PID_3 && kill -TERM $PID_4
wait
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table0"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
query_with_retry "ALTER TABLE alter_table0 ATTACH PARTITION ID 'all'" 2>/dev/null;
$CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table1 ATTACH PARTITION ID 'all'" 2>/dev/null
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
$CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table1 ATTACH PARTITION ID 'all'"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table0"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
engine=$($CLICKHOUSE_CLIENT -q "SELECT engine FROM system.tables WHERE database=currentDatabase() AND table='alter_table0'")
if [[ "$engine" == "ReplicatedMergeTree" ]]; then
# ReplicatedMergeTree may duplicate data on ATTACH PARTITION (when one replica has a merged part and another replica has source parts only)
$CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE alter_table0 FINAL DEDUPLICATE"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
fi
$CLICKHOUSE_CLIENT -q "SELECT count(), sum(b) FROM alter_table0"
$CLICKHOUSE_CLIENT -q "SELECT count(), sum(b) FROM alter_table1"
$CLICKHOUSE_CLIENT -q "DROP TABLE alter_table0"
$CLICKHOUSE_CLIENT -q "DROP TABLE alter_table1"

View File

@ -5,23 +5,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=$($CLICKHOUSE_CLIENT $2 --query="$1" 2>&1)
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT -n --query "
DROP TABLE IF EXISTS load_parts_refcounts SYNC;

View File

@ -155,3 +155,23 @@ function random_str()
local n=$1 && shift
tr -cd '[:lower:]' < /dev/urandom | head -c"$n"
}
function query_with_retry
{
local query="$1" && shift
local retry=0
until [ $retry -ge 5 ]
do
local result
result="$($CLICKHOUSE_CLIENT "$@" --query="$query" 2>&1)"
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$((retry + 1))
sleep 3
fi
done
echo "Query '$query' failed with '$result'"
}