Fix race in Distributed table startup

Before this patch it was possible to have multiple directory monitors
for the same directory, one from the INSERT context, another one on
storage startup().

Here are an example of logs for this scenario:

    2022.12.07 12:12:27.552485 [ 39925 ] {a47fcb32-4f44-4dbd-94fe-0070d4ea0f6b} <Debug> DDLWorker: Executed query: DETACH TABLE inc.dist_urls_in
    ...
    2022.12.07 12:12:33.228449 [ 4408 ] {20c761d3-a46d-417b-9fcd-89a8919dd1fe} <Debug> executeQuery: (from 0.0.0.0:0, user: ) /* ddl_entry=query-0000089229 */ ATTACH TABLE inc.dist_urls_in (stage: Complete)
    ... this is the DirectoryMonitor created from the context of INSERT for the old StoragePtr that had not been destroyed yet (becase of "was 1" this can be done only from the context of INSERT) ...
    2022.12.07 12:12:35.556048 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Files set to 173 (was 1)
    2022.12.07 12:12:35.556078 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Bytes set to 29750181 (was 71004)
    2022.12.07 12:12:35.562716 [ 39536 ] {} <Trace> Connection (i13.ch:9000): Connected to ClickHouse server version 22.10.1.
    2022.12.07 12:12:35.562750 [ 39536 ] {} <Debug> inc.dist_urls_in.DirectoryMonitor: Sending a batch of 10 files to i13.ch:9000 (0.00 rows, 0.00 B bytes).
    ... this is the DirectoryMonitor that created during ATTACH ...
    2022.12.07 12:12:35.802080 [ 39265 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Files set to 173 (was 0)
    2022.12.07 12:12:35.802107 [ 39265 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Bytes set to 29750181 (was 0)
    2022.12.07 12:12:35.834216 [ 39265 ] {} <Debug> inc.dist_urls_in.DirectoryMonitor: Sending a batch of 10 files to i13.ch:9000 (0.00 rows, 0.00 B bytes).
    ...
    2022.12.07 12:12:38.532627 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Sent a batch of 10 files (took 2976 ms).
    ...
    2022.12.07 12:12:38.601051 [ 39265 ] {} <Error> inc.dist_urls_in.DirectoryMonitor: std::exception. Code: 1001, type: std::__1::__fs::filesystem::filesystem_error, e.what() = filesystem error: in file_size: No such file or directory ["/data6/clickhouse/data/inc/dist_urls_in/shard13_replica1/66827403.bin"], Stack trace (when copying this message, always include the lines below):
    ...
    2022.12.07 12:12:54.132837 [ 4408 ] {20c761d3-a46d-417b-9fcd-89a8919dd1fe} <Debug> DDLWorker: Executed query: ATTACH TABLE inc.dist_urls_in

And eventually both monitors (for a short period of time, one replaces
another) are trying to process the same batch (current_batch.txt), and
one of them fails because such file had been already removed.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-01-05 20:46:09 +01:00
parent 35431e91e3
commit 54fc6859ae

View File

@ -1234,40 +1234,20 @@ StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(
const std::string & disk_path = disk->getPath(); const std::string & disk_path = disk->getPath();
const std::string key(disk_path + name); const std::string key(disk_path + name);
auto create_node_data = [&]()
{
ClusterNodeData data;
data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
*this, disk, relative_data_path + name,
data.connection_pool,
monitors_blocker,
getContext()->getDistributedSchedulePool(),
/* initialize_from_disk= */ startup);
return data;
};
/// In case of startup the lock can be acquired later.
if (startup)
{
auto tmp_node_data = create_node_data();
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
assert(!node_data.directory_monitor);
node_data = std::move(tmp_node_data);
return *node_data.directory_monitor;
}
else
{
std::lock_guard lock(cluster_nodes_mutex); std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key]; auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_monitor) if (!node_data.directory_monitor)
{ {
node_data = create_node_data(); node_data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
*this, disk, relative_data_path + name,
node_data.connection_pool,
monitors_blocker,
getContext()->getDistributedSchedulePool(),
/* initialize_from_disk= */ startup);
} }
return *node_data.directory_monitor; return *node_data.directory_monitor;
} }
}
std::vector<StorageDistributedDirectoryMonitor::Status> StorageDistributed::getDirectoryMonitorsStatuses() const std::vector<StorageDistributedDirectoryMonitor::Status> StorageDistributed::getDirectoryMonitorsStatuses() const
{ {