Merge pull request #10486 from azat/dist-send-on-INSERT

Fix distributed send that are scheduled by INSERT query
This commit is contained in:
alexey-milovidov 2020-05-11 06:28:35 +03:00 committed by GitHub
commit ddc84163a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 69 additions and 71 deletions

View File

@ -236,6 +236,8 @@
<!-- Test only shard config for testing distributed storage -->
<test_shard_localhost>
<shard>
<!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
<!-- <internal_replication>false</internal_replication> -->
<replica>
<host>localhost</host>
<port>9000</port>

View File

@ -41,12 +41,14 @@ bool BackgroundSchedulePoolTaskInfo::schedule()
return true;
}
bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t ms)
bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t ms, bool overwrite)
{
std::lock_guard lock(schedule_mutex);
if (deactivated || scheduled)
return false;
if (delayed && !overwrite)
return false;
pool.scheduleDelayedTask(shared_from_this(), ms, lock);
return true;

View File

@ -102,7 +102,8 @@ public:
bool schedule();
/// Schedule for execution after specified delay.
bool scheduleAfter(size_t ms);
/// If overwrite is set then the task will be re-scheduled (if it was already scheduled, i.e. delayed == true).
bool scheduleAfter(size_t ms, bool overwrite = true);
/// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
void deactivate();

View File

@ -78,7 +78,6 @@ namespace
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_)
/// It's important to initialize members before `thread` to avoid race.
: storage(storage_)
, pool(std::move(pool_))
, path{path_ + '/'}
@ -103,7 +102,6 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
if (!quit)
{
quit = true;
cond.notify_one();
task_handle->deactivate();
}
}
@ -122,7 +120,6 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
if (!quit)
{
quit = true;
cond.notify_one();
task_handle->deactivate();
}
@ -134,9 +131,10 @@ void StorageDistributedDirectoryMonitor::run()
{
std::unique_lock lock{mutex};
bool do_sleep = false;
while (!quit)
{
bool do_sleep = true;
do_sleep = true;
if (!monitor_blocker.isCancelled())
{
try
@ -169,15 +167,8 @@ void StorageDistributedDirectoryMonitor::run()
break;
}
if (!quit)
{
/// If there is no error, then it will be scheduled by the DistributedBlockOutputStream,
/// so this is just in case, hence it is distributed_directory_monitor_max_sleep_time_ms
if (error_count)
task_handle->scheduleAfter(sleep_time.count());
else
task_handle->scheduleAfter(max_sleep_time.count());
}
if (!quit && do_sleep)
task_handle->scheduleAfter(sleep_time.count());
}
@ -591,7 +582,7 @@ bool StorageDistributedDirectoryMonitor::scheduleAfter(size_t ms)
{
if (quit)
return false;
return task_handle->scheduleAfter(ms);
return task_handle->scheduleAfter(ms, false);
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)

View File

@ -66,7 +66,6 @@ private:
std::chrono::time_point<std::chrono::system_clock> last_decrease_time {std::chrono::system_clock::now()};
std::atomic<bool> quit {false};
std::mutex mutex;
std::condition_variable cond;
Logger * log;
ActionBlocker & monitor_blocker;

View File

@ -28,6 +28,7 @@
#include <Common/ProfileEvents.h>
#include <Common/escapeForFileName.h>
#include <Common/CurrentThread.h>
#include <Common/createHardLink.h>
#include <common/logger_useful.h>
#include <ext/range.h>
#include <ext/scope_guard.h>
@ -57,7 +58,6 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int CANNOT_LINK;
}
static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & block, const size_t repeats)
@ -554,76 +554,79 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
/** tmp directory is used to ensure atomicity of transactions
* and keep monitor thread out from reading incomplete data
*/
/// tmp directory is used to ensure atomicity of transactions
/// and keep monitor thread out from reading incomplete data
std::string first_file_tmp_path{};
auto first = true;
const auto & [disk, data_path] = storage.getPath();
/// write first file, hardlink the others
for (const auto & dir_name : dir_names)
auto it = dir_names.begin();
/// on first iteration write block to a temporary directory for subsequent
/// hardlinking to ensure the inode is not freed until we're done
{
const auto & [disk, data_path] = storage.getPath();
const std::string path(disk + data_path + dir_name + '/');
/// ensure shard subdirectory creation and notify storage
const std::string path(disk + data_path + *it);
Poco::File(path).createDirectory();
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
const auto & file_name = toString(storage.file_names_increment.get()) + ".bin";
const auto & block_file_path = path + file_name;
const std::string tmp_path(path + "/tmp/");
Poco::File(tmp_path).createDirectory();
/** on first iteration write block to a temporary directory for subsequent hardlinking to ensure
* the inode is not freed until we're done */
if (first)
{
first = false;
const std::string file_name(toString(storage.file_names_increment.get()) + ".bin");
const auto & tmp_path = path + "tmp/";
Poco::File(tmp_path).createDirectory();
const auto & block_file_tmp_path = tmp_path + file_name;
first_file_tmp_path = tmp_path + file_name;
first_file_tmp_path = block_file_tmp_path;
WriteBufferFromFile out{first_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
WriteBufferFromFile out{block_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
/// Prepare the header.
/// We wrap the header into a string for compatibility with older versions:
/// a shard will able to read the header partly and ignore other parts based on its version.
WriteBufferFromOwnString header_buf;
writeVarUInt(ClickHouseRevision::get(), header_buf);
writeStringBinary(query_string, header_buf);
context.getSettingsRef().serialize(header_buf);
context.getClientInfo().write(header_buf, ClickHouseRevision::get());
/// Prepare the header.
/// We wrap the header into a string for compatibility with older versions:
/// a shard will able to read the header partly and ignore other parts based on its version.
WriteBufferFromOwnString header_buf;
writeVarUInt(ClickHouseRevision::get(), header_buf);
writeStringBinary(query_string, header_buf);
context.getSettingsRef().serialize(header_buf);
context.getClientInfo().write(header_buf, ClickHouseRevision::get());
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
/// Write the header.
const StringRef header = header_buf.stringRef();
writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
writeStringBinary(header, out);
writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out);
/// Write the header.
const StringRef header = header_buf.stringRef();
writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
writeStringBinary(header, out);
writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out);
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
// Create hardlink here to reuse increment number
const std::string block_file_path(path + '/' + file_name);
createHardLink(first_file_tmp_path, block_file_path);
}
++it;
auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms;
directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds());
}
/// Make hardlinks
for (; it != dir_names.end(); ++it)
{
const std::string path(disk + data_path + *it);
Poco::File(path).createDirectory();
const std::string block_file_path(path + '/' + toString(storage.file_names_increment.get()) + ".bin");
if (link(first_file_tmp_path.data(), block_file_path.data()))
throwFromErrnoWithPath("Could not link " + block_file_path + " to " + first_file_tmp_path, block_file_path,
ErrorCodes::CANNOT_LINK);
createHardLink(first_file_tmp_path, block_file_path);
}
/** remove the temporary file, enabling the OS to reclaim inode after all threads
* have removed their corresponding files */
/// remove the temporary file, enabling the OS to reclaim inode after all threads
/// have removed their corresponding files
Poco::File(first_file_tmp_path).remove();
/// Notify
auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms;
for (const auto & dir_name : dir_names)
{
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds());
}
}

View File

@ -26,7 +26,7 @@ def _files_in_dist_mon(node, root, table):
'bash',
'-c',
# `-maxdepth 1` to avoid /tmp/ subdirectory
'find /{root}/data/test/{table}/default@127%2E0%2E0%2E2:9000 -maxdepth 1 -type f | wc -l'.format(root=root, table=table)
'find /{root}/data/test/{table}/default@127%2E0%2E0%2E2:9000 -maxdepth 1 -type f 2>/dev/null | wc -l'.format(root=root, table=table)
]).split('\n')[0])
def test_different_versions(start_cluster):
@ -41,7 +41,7 @@ def test_different_versions(start_cluster):
'default'
)
""")
# manual only
# manual only (but only for remote node)
node.query('SYSTEM STOP DISTRIBUTED SENDS test.dist_foo')
node.query('INSERT INTO test.dist_foo SELECT * FROM numbers(100)')