diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 60ff6e49b13..5d7d5d8173a 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -290,7 +290,7 @@ struct Settings \ /** If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster. \ */ \ - M(SettingBool, insert_distributed_sync, 0) \ + M(SettingBool, insert_distributed_sync, false) \ /** Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. \ * Zero value means no timeout. \ */ \ diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp index 07bed4a6cf4..b531953cde2 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp @@ -86,7 +86,7 @@ namespace } -StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, ConnectionPoolPtr pool) +StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool) : storage(storage), pool{pool}, path{storage.path + name + '/'} , current_batch_file_path{path + "current_batch.txt"} , default_sleep_time{storage.context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()} diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.h b/dbms/src/Storages/Distributed/DirectoryMonitor.h index 0b556fdbbfd..cb7ff07a5ef 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.h +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.h @@ -16,7 +16,7 @@ namespace DB class StorageDistributedDirectoryMonitor { public: - StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, ConnectionPoolPtr pool); + StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool); ~StorageDistributedDirectoryMonitor(); static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage); diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 239f8ba94d8..046a4b500df 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -93,7 +93,7 @@ ThreadPool::Job DistributedBlockOutputStream::createWritingJob( if (!current_memory_tracker) { current_memory_tracker = memory_tracker; - setThreadName("DistributedBlockOutputStreamProc"); + setThreadName("DistrOutStrProc"); } try { @@ -151,7 +151,8 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription(const Writi for (const auto & address : addresses_with_failovers[shard_id]) if (!address.is_local) { - writeDescription(address, shard_id, blocks_inserted + (context.done_remote_jobs[remote_job_id++] ? 1 : 0)); + writeDescription(address, shard_id, blocks_inserted + (context.done_remote_jobs[remote_job_id] ? 1 : 0)); + ++remote_job_id; if (shard_info.hasInternalReplication()) break; } @@ -160,7 +161,8 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription(const Writi if (shard_info.isLocal()) { const auto & address = shard_info.local_addresses.front(); - writeDescription(address, shard_id, blocks_inserted + (context.done_local_jobs[local_job_id++] ? 1 : 0)); + writeDescription(address, shard_id, blocks_inserted + (context.done_local_jobs[local_job_id] ? 1 : 0)); + ++local_job_id; } } @@ -184,7 +186,8 @@ void DistributedBlockOutputStream::createWritingJobs(WritingJobContext & context for (const auto & address : addresses_with_failovers[shard_id]) if (!address.is_local) { - pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, remote_job_id++)); + pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, remote_job_id)); + ++remote_job_id; if (shard_info.hasInternalReplication()) break; } @@ -193,7 +196,8 @@ void DistributedBlockOutputStream::createWritingJobs(WritingJobContext & context if (shards_info[shard_id].isLocal()) { const auto & address = shards_info[shard_id].local_addresses.front(); - pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, local_job_id++)); + pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, local_job_id)); + ++local_job_id; } } } @@ -270,7 +274,7 @@ void DistributedBlockOutputStream::writeSync(const Block & block) { waitForUnfinishedJobs(context); } - catch(Exception & exception) + catch (Exception & exception) { exception.addMessage(getCurrentStateDescription(context)); throw; diff --git a/dbms/src/Storages/StorageDistributed.h b/dbms/src/Storages/StorageDistributed.h index de79c4ed1da..d9e17b3d644 100644 --- a/dbms/src/Storages/StorageDistributed.h +++ b/dbms/src/Storages/StorageDistributed.h @@ -118,7 +118,7 @@ private: /// create directory monitors for each existing subdirectory void createDirectoryMonitors(); - /// ensure directory monitor thread by subdirectory name creation + /// ensure directory monitor thread creation by subdirectory name void requireDirectoryMonitor(const std::string & name); /// ensure connection pool creation and return it ConnectionPoolPtr requireConnectionPool(const std::string & name);