merged with remote [#CLICKHOUSE-3033]

This commit is contained in:
Nikolai Kochetov 2017-08-11 18:38:46 +03:00
commit 24b5a59f8c
5 changed files with 14 additions and 10 deletions

View File

@ -290,7 +290,7 @@ struct Settings
\ \
/** If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster. \ /** If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster. \
*/ \ */ \
M(SettingBool, insert_distributed_sync, 0) \ M(SettingBool, insert_distributed_sync, false) \
/** Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. \ /** Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. \
* Zero value means no timeout. \ * Zero value means no timeout. \
*/ \ */ \

View File

@ -86,7 +86,7 @@ namespace
} }
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, ConnectionPoolPtr pool) StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool)
: storage(storage), pool{pool}, path{storage.path + name + '/'} : storage(storage), pool{pool}, path{storage.path + name + '/'}
, current_batch_file_path{path + "current_batch.txt"} , current_batch_file_path{path + "current_batch.txt"}
, default_sleep_time{storage.context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()} , default_sleep_time{storage.context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}

View File

@ -16,7 +16,7 @@ namespace DB
class StorageDistributedDirectoryMonitor class StorageDistributedDirectoryMonitor
{ {
public: public:
StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, ConnectionPoolPtr pool); StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool);
~StorageDistributedDirectoryMonitor(); ~StorageDistributedDirectoryMonitor();
static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage); static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);

View File

@ -93,7 +93,7 @@ ThreadPool::Job DistributedBlockOutputStream::createWritingJob(
if (!current_memory_tracker) if (!current_memory_tracker)
{ {
current_memory_tracker = memory_tracker; current_memory_tracker = memory_tracker;
setThreadName("DistributedBlockOutputStreamProc"); setThreadName("DistrOutStrProc");
} }
try try
{ {
@ -151,7 +151,8 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription(const Writi
for (const auto & address : addresses_with_failovers[shard_id]) for (const auto & address : addresses_with_failovers[shard_id])
if (!address.is_local) if (!address.is_local)
{ {
writeDescription(address, shard_id, blocks_inserted + (context.done_remote_jobs[remote_job_id++] ? 1 : 0)); writeDescription(address, shard_id, blocks_inserted + (context.done_remote_jobs[remote_job_id] ? 1 : 0));
++remote_job_id;
if (shard_info.hasInternalReplication()) if (shard_info.hasInternalReplication())
break; break;
} }
@ -160,7 +161,8 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription(const Writi
if (shard_info.isLocal()) if (shard_info.isLocal())
{ {
const auto & address = shard_info.local_addresses.front(); const auto & address = shard_info.local_addresses.front();
writeDescription(address, shard_id, blocks_inserted + (context.done_local_jobs[local_job_id++] ? 1 : 0)); writeDescription(address, shard_id, blocks_inserted + (context.done_local_jobs[local_job_id] ? 1 : 0));
++local_job_id;
} }
} }
@ -184,7 +186,8 @@ void DistributedBlockOutputStream::createWritingJobs(WritingJobContext & context
for (const auto & address : addresses_with_failovers[shard_id]) for (const auto & address : addresses_with_failovers[shard_id])
if (!address.is_local) if (!address.is_local)
{ {
pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, remote_job_id++)); pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, remote_job_id));
++remote_job_id;
if (shard_info.hasInternalReplication()) if (shard_info.hasInternalReplication())
break; break;
} }
@ -193,7 +196,8 @@ void DistributedBlockOutputStream::createWritingJobs(WritingJobContext & context
if (shards_info[shard_id].isLocal()) if (shards_info[shard_id].isLocal())
{ {
const auto & address = shards_info[shard_id].local_addresses.front(); const auto & address = shards_info[shard_id].local_addresses.front();
pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, local_job_id++)); pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, local_job_id));
++local_job_id;
} }
} }
} }
@ -270,7 +274,7 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
{ {
waitForUnfinishedJobs(context); waitForUnfinishedJobs(context);
} }
catch(Exception & exception) catch (Exception & exception)
{ {
exception.addMessage(getCurrentStateDescription(context)); exception.addMessage(getCurrentStateDescription(context));
throw; throw;

View File

@ -118,7 +118,7 @@ private:
/// create directory monitors for each existing subdirectory /// create directory monitors for each existing subdirectory
void createDirectoryMonitors(); void createDirectoryMonitors();
/// ensure directory monitor thread by subdirectory name creation /// ensure directory monitor thread creation by subdirectory name
void requireDirectoryMonitor(const std::string & name); void requireDirectoryMonitor(const std::string & name);
/// ensure connection pool creation and return it /// ensure connection pool creation and return it
ConnectionPoolPtr requireConnectionPool(const std::string & name); ConnectionPoolPtr requireConnectionPool(const std::string & name);