mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Merge pull request #8029 from ClickHouse/fix_dict_partially_update_failover
Fix external dictionaries failover, when they have invalidate_query.
This commit is contained in:
commit
a524e82a7f
@ -560,8 +560,8 @@ public:
|
|||||||
/// The function doesn't touch the objects which were never tried to load.
|
/// The function doesn't touch the objects which were never tried to load.
|
||||||
void reloadOutdated()
|
void reloadOutdated()
|
||||||
{
|
{
|
||||||
/// Iterate through all the objects and find loaded ones which should be checked if they were modified.
|
/// Iterate through all the objects and find loaded ones which should be checked if they need update.
|
||||||
std::unordered_map<LoadablePtr, bool> is_modified_map;
|
std::unordered_map<LoadablePtr, bool> should_update_map;
|
||||||
{
|
{
|
||||||
std::lock_guard lock{mutex};
|
std::lock_guard lock{mutex};
|
||||||
TimePoint now = std::chrono::system_clock::now();
|
TimePoint now = std::chrono::system_clock::now();
|
||||||
@ -569,22 +569,26 @@ public:
|
|||||||
{
|
{
|
||||||
const auto & info = name_and_info.second;
|
const auto & info = name_and_info.second;
|
||||||
if ((now >= info.next_update_time) && !info.loading() && info.loaded())
|
if ((now >= info.next_update_time) && !info.loading() && info.loaded())
|
||||||
is_modified_map.emplace(info.object, true);
|
should_update_map.emplace(info.object, info.failedToReload());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Find out which of the loaded objects were modified.
|
/// Find out which of the loaded objects were modified.
|
||||||
/// We couldn't perform these checks while we were building `is_modified_map` because
|
/// We couldn't perform these checks while we were building `should_update_map` because
|
||||||
/// the `mutex` should be unlocked while we're calling the function object->isModified()
|
/// the `mutex` should be unlocked while we're calling the function object->isModified()
|
||||||
for (auto & [object, is_modified_flag] : is_modified_map)
|
for (auto & [object, should_update_flag] : should_update_map)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
is_modified_flag = object->isModified();
|
/// Maybe alredy true, if we have an exception
|
||||||
|
if (!should_update_flag)
|
||||||
|
should_update_flag = object->isModified();
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
tryLogCurrentException(log, "Could not check if " + type_name + " '" + object->getName() + "' was modified");
|
tryLogCurrentException(log, "Could not check if " + type_name + " '" + object->getName() + "' was modified");
|
||||||
|
/// Cannot check isModified, so update
|
||||||
|
should_update_flag = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -598,19 +602,18 @@ public:
|
|||||||
{
|
{
|
||||||
if (info.loaded())
|
if (info.loaded())
|
||||||
{
|
{
|
||||||
auto it = is_modified_map.find(info.object);
|
auto it = should_update_map.find(info.object);
|
||||||
if (it == is_modified_map.end())
|
if (it == should_update_map.end())
|
||||||
continue; /// Object has been just loaded (it wasn't loaded while we were building the map `is_modified_map`), so we don't have to reload it right now.
|
continue; /// Object has been just loaded (it wasn't loaded while we were building the map `should_update_map`), so we don't have to reload it right now.
|
||||||
|
|
||||||
bool is_modified_flag = it->second;
|
bool should_update_flag = it->second;
|
||||||
if (!is_modified_flag)
|
if (!should_update_flag)
|
||||||
{
|
{
|
||||||
/// Object wasn't modified so we only have to set `next_update_time`.
|
|
||||||
info.next_update_time = calculateNextUpdateTime(info.object, info.error_count);
|
info.next_update_time = calculateNextUpdateTime(info.object, info.error_count);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Object was modified and should be reloaded.
|
/// Object was modified or it was failed to reload last time, so it should be reloaded.
|
||||||
startLoading(name, info);
|
startLoading(name, info);
|
||||||
}
|
}
|
||||||
else if (info.failed())
|
else if (info.failed())
|
||||||
@ -633,6 +636,7 @@ private:
|
|||||||
bool loading() const { return loading_id != 0; }
|
bool loading() const { return loading_id != 0; }
|
||||||
bool wasLoading() const { return loaded() || failed() || loading(); }
|
bool wasLoading() const { return loaded() || failed() || loading(); }
|
||||||
bool ready() const { return (loaded() || failed()) && !forced_to_reload; }
|
bool ready() const { return (loaded() || failed()) && !forced_to_reload; }
|
||||||
|
bool failedToReload() const { return loaded() && exception != nullptr; }
|
||||||
|
|
||||||
Status status() const
|
Status status() const
|
||||||
{
|
{
|
||||||
|
@ -1,2 +1,3 @@
|
|||||||
INITIALIZING DICTIONARY
|
INITIALIZING DICTIONARY
|
||||||
|
1
|
||||||
1 10
|
1 10
|
||||||
|
@ -36,6 +36,8 @@ LAYOUT(FLAT());
|
|||||||
|
|
||||||
SELECT 'INITIALIZING DICTIONARY';
|
SELECT 'INITIALIZING DICTIONARY';
|
||||||
|
|
||||||
|
SELECT dictGetUInt8('ordinary_db.dict1', 'second_column', toUInt64(100500));
|
||||||
|
|
||||||
SELECT lifetime_min, lifetime_max FROM system.dictionaries WHERE name = 'dict1';
|
SELECT lifetime_min, lifetime_max FROM system.dictionaries WHERE name = 'dict1';
|
||||||
|
|
||||||
DROP DICTIONARY IF EXISTS ordinary_db.dict1;
|
DROP DICTIONARY IF EXISTS ordinary_db.dict1;
|
||||||
|
@ -0,0 +1,5 @@
|
|||||||
|
122
|
||||||
|
|
||||||
|
Table dictdb.dict_invalidate doesn\'t exist.
|
||||||
|
|
||||||
|
133
|
84
dbms/tests/queries/0_stateless/01040_dictionary_invalidate_query_failover.sh
Executable file
84
dbms/tests/queries/0_stateless/01040_dictionary_invalidate_query_failover.sh
Executable file
@ -0,0 +1,84 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
. $CURDIR/../shell_config.sh
|
||||||
|
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "DROP DATABASE IF EXISTS dictdb"
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "CREATE DATABASE dictdb Engine = Ordinary"
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "
|
||||||
|
CREATE TABLE dictdb.dict_invalidate
|
||||||
|
ENGINE = Memory AS
|
||||||
|
SELECT
|
||||||
|
122 as dummy,
|
||||||
|
toDateTime('2019-10-29 18:51:35') AS last_time
|
||||||
|
FROM system.one"
|
||||||
|
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "
|
||||||
|
CREATE DICTIONARY dictdb.invalidate
|
||||||
|
(
|
||||||
|
dummy UInt64,
|
||||||
|
two UInt8 EXPRESSION dummy
|
||||||
|
)
|
||||||
|
PRIMARY KEY dummy
|
||||||
|
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'dict_invalidate' DB 'dictdb' INVALIDATE_QUERY 'select max(last_time) from dictdb.dict_invalidate'))
|
||||||
|
LIFETIME(MIN 0 MAX 1)
|
||||||
|
LAYOUT(FLAT())"
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "SELECT dictGetUInt8('dictdb.invalidate', 'two', toUInt64(122))"
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "SELECT last_exception FROM system.dictionaries WHERE database = 'dictdb' AND name = 'invalidate'"
|
||||||
|
|
||||||
|
# Bad solution, but it's quite complicated to detect, that invalidte_query stopped updates.
|
||||||
|
# In worst case we don't check anything, but fortunately it doesn't lead to false negatives.
|
||||||
|
sleep 5
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "DROP TABLE dictdb.dict_invalidate"
|
||||||
|
|
||||||
|
function check_exception_detected()
|
||||||
|
{
|
||||||
|
|
||||||
|
query_result=`$CLICKHOUSE_CLIENT --query "SELECT last_exception FROM system.dictionaries WHERE database = 'dictdb' AND name = 'invalidate'" 2>&1`
|
||||||
|
|
||||||
|
while [ -z "$query_result" ]
|
||||||
|
do
|
||||||
|
query_result=`$CLICKHOUSE_CLIENT --query "SELECT last_exception FROM system.dictionaries WHERE database = 'dictdb' AND name = 'invalidate'" 2>&1`
|
||||||
|
sleep 0.1
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
export -f check_exception_detected;
|
||||||
|
timeout 10 bash -c check_exception_detected 2> /dev/null
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "SELECT last_exception FROM system.dictionaries WHERE database = 'dictdb' AND name = 'invalidate'" 2>&1 | grep -Eo "Table dictdb.dict_invalidate .* exist."
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "
|
||||||
|
CREATE TABLE dictdb.dict_invalidate
|
||||||
|
ENGINE = Memory AS
|
||||||
|
SELECT
|
||||||
|
133 as dummy,
|
||||||
|
toDateTime('2019-10-29 18:51:35') AS last_time
|
||||||
|
FROM system.one"
|
||||||
|
|
||||||
|
function check_exception_fixed()
|
||||||
|
{
|
||||||
|
query_result=`$CLICKHOUSE_CLIENT --query "SELECT last_exception FROM system.dictionaries WHERE database = 'dictdb' AND name = 'invalidate'" 2>&1`
|
||||||
|
|
||||||
|
while [ "$query_result" ]
|
||||||
|
do
|
||||||
|
query_result=`$CLICKHOUSE_CLIENT --query "SELECT last_exception FROM system.dictionaries WHERE database = 'dictdb' AND name = 'invalidate'" 2>&1`
|
||||||
|
sleep 0.1
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
export -f check_exception_fixed;
|
||||||
|
timeout 10 bash -c check_exception_fixed 2> /dev/null
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "SELECT last_exception FROM system.dictionaries WHERE database = 'dictdb' AND name = 'invalidate'" 2>&1
|
||||||
|
$CLICKHOUSE_CLIENT --query "SELECT dictGetUInt8('dictdb.invalidate', 'two', toUInt64(133))"
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "DROP DATABASE IF EXISTS dictdb"
|
Loading…
Reference in New Issue
Block a user