Relax a test due to Poco regression.

Poco 1.9.3 has one second precision for mtime, because of this we can
miss an update of a dictionary file if it happens in the same second
we've read the file.

Should probably be fixed by switching to std::filesystem.

Also add some logs because finding out what's going on with dictionary
reloading is borderline impossible.
This commit is contained in:
Alexander Kuzmenkov 2020-03-19 05:17:30 +03:00
parent e38f70de65
commit 9d9ae00956
6 changed files with 130 additions and 9 deletions

View File

@ -1,16 +1,30 @@
#pragma once #pragma once
#include <chrono> #include <chrono>
#include <ctime>
#include <string> #include <string>
#include <common/DateLUT.h> #include <iomanip>
namespace ext namespace ext
{ {
inline std::string to_string(const std::time_t & time)
{
std::stringstream ss;
ss << std::put_time(std::localtime(&time), "%Y-%m-%d %X");
return ss.str();
}
template <typename Clock, typename Duration = typename Clock::duration> template <typename Clock, typename Duration = typename Clock::duration>
std::string to_string(const std::chrono::time_point<Clock, Duration> & tp) std::string to_string(const std::chrono::time_point<Clock, Duration> & tp)
{ {
return DateLUT::instance().timeToString(std::chrono::system_clock::to_time_t(tp)); // Don't use DateLUT because it shows weird characters for
// TimePoint::max(). I wish we could use C++20 format, but it's not
// there yet.
// return DateLUT::instance().timeToString(std::chrono::system_clock::to_time_t(tp));
auto in_time_t = std::chrono::system_clock::to_time_t(tp);
return to_string(in_time_t);
} }
template <typename Rep, typename Period = std::ratio<1>> template <typename Rep, typename Period = std::ratio<1>>

View File

@ -194,6 +194,8 @@ int ShellCommand::tryWait()
if (-1 == waitpid(pid, &status, 0)) if (-1 == waitpid(pid, &status, 0))
throwFromErrno("Cannot waitpid", ErrorCodes::CANNOT_WAITPID); throwFromErrno("Cannot waitpid", ErrorCodes::CANNOT_WAITPID);
LOG_TRACE(getLogger(), "Wait for shell command pid " << pid << " completed with status " << status);
if (WIFEXITED(status)) if (WIFEXITED(status))
return WEXITSTATUS(status); return WEXITSTATUS(status);

View File

@ -48,6 +48,7 @@ FileDictionarySource::FileDictionarySource(const FileDictionarySource & other)
BlockInputStreamPtr FileDictionarySource::loadAll() BlockInputStreamPtr FileDictionarySource::loadAll()
{ {
LOG_TRACE(&Poco::Logger::get("FileDictionary"), "loadAll " + toString());
auto in_ptr = std::make_unique<ReadBufferFromFile>(filepath); auto in_ptr = std::make_unique<ReadBufferFromFile>(filepath);
auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
last_modification = getLastModification(); last_modification = getLastModification();

View File

@ -39,7 +39,14 @@ public:
throw Exception{"Method loadKeys is unsupported for FileDictionarySource", ErrorCodes::NOT_IMPLEMENTED}; throw Exception{"Method loadKeys is unsupported for FileDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
} }
bool isModified() const override { return getLastModification() > last_modification; } bool isModified() const override
{
// We can't count on that the mtime increases or that it has
// a particular relation to system time, so just check for strict
// equality.
return getLastModification() != last_modification;
}
bool supportsSelectiveLoad() const override { return false; } bool supportsSelectiveLoad() const override { return false; }
///Not supported for FileDictionarySource ///Not supported for FileDictionarySource

View File

@ -257,13 +257,22 @@ private:
auto update_time_from_repository = repository.getUpdateTime(path); auto update_time_from_repository = repository.getUpdateTime(path);
/// Actually it can't be less, but for sure we check less or equal // We can't count on that the mtime increases or that it has
if (update_time_from_repository <= file_info.last_update_time) // a particular relation to system time, so just check for strict
// equality.
// Note that on 1.x versions on Poco, the granularity of update
// time is one second, so the window where we can miss the changes
// is that wide (i.e. when we read the file and after that it
// is updated, but in the same second).
// The solution to this is probably switching to std::filesystem
// -- the work is underway to do so.
if (update_time_from_repository == file_info.last_update_time)
{ {
file_info.in_use = true; file_info.in_use = true;
return false; return false;
} }
LOG_TRACE(log, "Loading config file '" << path << "'.");
auto file_contents = repository.load(path); auto file_contents = repository.load(path);
/// get all objects' definitions /// get all objects' definitions
@ -411,6 +420,8 @@ public:
if (configs == new_configs) if (configs == new_configs)
return; return;
LOG_TRACE(log, "Configuration of reloadable objects has changed");
configs = new_configs; configs = new_configs;
std::vector<String> removed_names; std::vector<String> removed_names;
@ -418,7 +429,10 @@ public:
{ {
auto new_config_it = new_configs->find(name); auto new_config_it = new_configs->find(name);
if (new_config_it == new_configs->end()) if (new_config_it == new_configs->end())
{
LOG_TRACE(log, "Reloadable object '" << name << "' is removed");
removed_names.emplace_back(name); removed_names.emplace_back(name);
}
else else
{ {
const auto & new_config = new_config_it->second; const auto & new_config = new_config_it->second;
@ -427,12 +441,17 @@ public:
if (!config_is_same) if (!config_is_same)
{ {
/// Configuration has been changed. /// Configuration has been changed.
LOG_TRACE(log, "Configuration has changed for reloadable "
"object '" << info.name << "'");
info.object_config = new_config; info.object_config = new_config;
if (info.triedToLoad()) if (info.triedToLoad())
{ {
/// The object has been tried to load before, so it is currently in use or was in use /// The object has been tried to load before, so it is currently in use or was in use
/// and we should try to reload it with the new config. /// and we should try to reload it with the new config.
LOG_TRACE(log, "Will reload '" << name << "'"
" because its configuration has changed and"
" there were attempts to load it before");
startLoading(info, true); startLoading(info, true);
} }
} }
@ -446,7 +465,11 @@ public:
{ {
Info & info = infos.emplace(name, Info{name, config}).first->second; Info & info = infos.emplace(name, Info{name, config}).first->second;
if (always_load_everything) if (always_load_everything)
{
LOG_TRACE(log, "Will reload new object '" << name << "'"
" because always_load_everything flag is set.");
startLoading(info); startLoading(info);
}
} }
} }
@ -640,6 +663,10 @@ public:
if (!should_update_flag) if (!should_update_flag)
{ {
info.next_update_time = calculateNextUpdateTime(info.object, info.error_count); info.next_update_time = calculateNextUpdateTime(info.object, info.error_count);
LOG_TRACE(log, "Object '" << info.name << "'"
" not modified, will not reload. "
"Next update at "
<< ext::to_string(info.next_update_time));
continue; continue;
} }
@ -651,6 +678,8 @@ public:
/// Object was never loaded successfully and should be reloaded. /// Object was never loaded successfully and should be reloaded.
startLoading(info); startLoading(info);
} }
LOG_TRACE(log, "Object '" << info.name << "' is neither"
" loaded nor failed, so it will not be reloaded as outdated.");
} }
} }
} }
@ -844,8 +873,14 @@ private:
{ {
if (info.is_loading()) if (info.is_loading())
{ {
LOG_TRACE(log, "The object '" << info.name <<
"' is already being loaded, force = " << forced_to_reload << ".");
if (!forced_to_reload) if (!forced_to_reload)
{
return; return;
}
cancelLoading(info); cancelLoading(info);
} }
@ -855,6 +890,12 @@ private:
info.loading_start_time = std::chrono::system_clock::now(); info.loading_start_time = std::chrono::system_clock::now();
info.loading_end_time = TimePoint{}; info.loading_end_time = TimePoint{};
LOG_TRACE(log, "Will load the object '" << info.name << "' "
<< (enable_async_loading ? std::string("in background")
: "immediately")
<< ", force = " << forced_to_reload
<< ", loading_id = " << info.loading_id);
if (enable_async_loading) if (enable_async_loading)
{ {
/// Put a job to the thread pool for the loading. /// Put a job to the thread pool for the loading.
@ -882,6 +923,7 @@ private:
/// Does the loading, possibly in the separate thread. /// Does the loading, possibly in the separate thread.
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async) void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async)
{ {
LOG_TRACE(log, "Start loading object '" << name << "'");
try try
{ {
/// Prepare for loading. /// Prepare for loading.
@ -890,7 +932,11 @@ private:
LoadingGuardForAsyncLoad lock(async, mutex); LoadingGuardForAsyncLoad lock(async, mutex);
info = prepareToLoadSingleObject(name, loading_id, min_id_to_finish_loading_dependencies_, lock); info = prepareToLoadSingleObject(name, loading_id, min_id_to_finish_loading_dependencies_, lock);
if (!info) if (!info)
{
LOG_TRACE(log, "Could not lock object '" << name
<< "' for loading");
return; return;
}
} }
/// Previous version can be used as the base for new loading, enabling loading only part of data. /// Previous version can be used as the base for new loading, enabling loading only part of data.
@ -989,8 +1035,22 @@ private:
/// We should check if this is still the same loading as we were doing. /// We should check if this is still the same loading as we were doing.
/// This is necessary because the object could be removed or load with another config while the `mutex` was unlocked. /// This is necessary because the object could be removed or load with another config while the `mutex` was unlocked.
if (!info || !info->is_loading() || (info->loading_id != loading_id)) if (!info)
{
LOG_TRACE(log, "Next update time for '" << name << "' will not be set because this object was not found.");
return; return;
}
if (!info->is_loading())
{
LOG_TRACE(log, "Next update time for '" << name << "' will not be set because this object is not currently loading.");
return;
}
if (info->loading_id != loading_id)
{
LOG_TRACE(log, "Next update time for '" << name << "' will not be set because this object's current loading_id "
<< info->loading_id << " is different from the specified " << loading_id << ".");
return;
}
if (new_exception) if (new_exception)
{ {
@ -1018,6 +1078,8 @@ private:
info->last_successful_update_time = current_time; info->last_successful_update_time = current_time;
info->state_id = info->loading_id; info->state_id = info->loading_id;
info->next_update_time = next_update_time; info->next_update_time = next_update_time;
LOG_TRACE(log, "Next update time for '" << info->name
<< "' was set to " << ext::to_string(next_update_time));
} }
/// Removes the references to the loading thread from the maps. /// Removes the references to the loading thread from the maps.
@ -1046,21 +1108,50 @@ private:
if (loaded_object) if (loaded_object)
{ {
if (!loaded_object->supportUpdates()) if (!loaded_object->supportUpdates())
{
LOG_TRACE(log, "Supposed update time for "
"'" + loaded_object->getLoadableName() + "'"
" is never (loaded, does not support updates)");
return never; return never;
}
/// do not update loadable objects with zero as lifetime /// do not update loadable objects with zero as lifetime
const auto & lifetime = loaded_object->getLifetime(); const auto & lifetime = loaded_object->getLifetime();
if (lifetime.min_sec == 0 && lifetime.max_sec == 0) if (lifetime.min_sec == 0 && lifetime.max_sec == 0)
{
LOG_TRACE(log, "Supposed update time for "
"'" + loaded_object->getLoadableName() + "'"
" is never (loaded, lifetime 0)");
return never; return never;
}
if (!error_count) if (!error_count)
{ {
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec}; std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
return std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; auto result = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
LOG_TRACE(log, "Supposed update time for "
"'" << loaded_object->getLoadableName() << "'"
" is " << ext::to_string(result)
<< " (loaded, lifetime [" << lifetime.min_sec
<< ", " << lifetime.max_sec << "], no errors)");
return result;
} }
}
return std::chrono::system_clock::now() + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count)); auto result = std::chrono::system_clock::now() + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));
LOG_TRACE(log, "Supposed update time for '" << loaded_object->getLoadableName() << "'"
" is " << ext::to_string(result)
<< " (backoff, " << error_count << " errors)");
return result;
}
else
{
auto result = std::chrono::system_clock::now() + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));
LOG_TRACE(log, "Supposed update time for unspecified object "
" is " << ext::to_string(result)
<< " (backoff, " << error_count << " errors.");
return result;
}
} }
const CreateObjectFunction create_object; const CreateObjectFunction create_object;

View File

@ -111,6 +111,10 @@ def test_reload_after_loading(started_cluster):
assert query("SELECT dictGetInt32('file', 'a', toUInt64(9))") == "10\n" assert query("SELECT dictGetInt32('file', 'a', toUInt64(9))") == "10\n"
# Change the dictionaries' data. # Change the dictionaries' data.
# FIXME we sleep before this, because Poco 1.x has one-second granularity
# for mtime, and clickhouse will miss the update if we change the file too
# soon. Should probably be fixed by switching to use std::filesystem.
time.sleep(1)
replace_in_file_in_container('/etc/clickhouse-server/config.d/executable.xml', '8', '81') replace_in_file_in_container('/etc/clickhouse-server/config.d/executable.xml', '8', '81')
replace_in_file_in_container('/etc/clickhouse-server/config.d/file.txt', '10', '101') replace_in_file_in_container('/etc/clickhouse-server/config.d/file.txt', '10', '101')
@ -124,6 +128,7 @@ def test_reload_after_loading(started_cluster):
assert query("SELECT dictGetInt32('file', 'a', toUInt64(9))") == "101\n" assert query("SELECT dictGetInt32('file', 'a', toUInt64(9))") == "101\n"
# SYSTEM RELOAD DICTIONARIES reloads all loaded dictionaries. # SYSTEM RELOAD DICTIONARIES reloads all loaded dictionaries.
time.sleep(1) # see the comment above
replace_in_file_in_container('/etc/clickhouse-server/config.d/executable.xml', '81', '82') replace_in_file_in_container('/etc/clickhouse-server/config.d/executable.xml', '81', '82')
replace_in_file_in_container('/etc/clickhouse-server/config.d/file.txt', '101', '102') replace_in_file_in_container('/etc/clickhouse-server/config.d/file.txt', '101', '102')
query("SYSTEM RELOAD DICTIONARIES") query("SYSTEM RELOAD DICTIONARIES")
@ -132,6 +137,7 @@ def test_reload_after_loading(started_cluster):
# Configuration files are reloaded and lifetimes are checked automatically once in 5 seconds. # Configuration files are reloaded and lifetimes are checked automatically once in 5 seconds.
# Wait slightly more, to be sure it did reload. # Wait slightly more, to be sure it did reload.
time.sleep(1) # see the comment above
replace_in_file_in_container('/etc/clickhouse-server/config.d/executable.xml', '82', '83') replace_in_file_in_container('/etc/clickhouse-server/config.d/executable.xml', '82', '83')
replace_in_file_in_container('/etc/clickhouse-server/config.d/file.txt', '102', '103') replace_in_file_in_container('/etc/clickhouse-server/config.d/file.txt', '102', '103')
time.sleep(7) time.sleep(7)