Fixes for clang-17

This commit is contained in:
Alexey Milovidov 2023-05-13 02:57:31 +02:00
parent c306deb1ae
commit 5a44dc26e7
52 changed files with 186 additions and 184 deletions

View File

@ -175,7 +175,7 @@ public:
Coordination::Stat stat{}; Coordination::Stat stat{};
String _some_data; String _some_data;
auto watch_callback = auto watch_callback =
[stale = stale] (const Coordination::WatchResponse & rsp) [my_stale = stale] (const Coordination::WatchResponse & rsp)
{ {
auto logger = &Poco::Logger::get("ClusterCopier"); auto logger = &Poco::Logger::get("ClusterCopier");
if (rsp.error == Coordination::Error::ZOK) if (rsp.error == Coordination::Error::ZOK)
@ -184,11 +184,11 @@ public:
{ {
case Coordination::CREATED: case Coordination::CREATED:
LOG_DEBUG(logger, "CleanStateClock change: CREATED, at {}", rsp.path); LOG_DEBUG(logger, "CleanStateClock change: CREATED, at {}", rsp.path);
stale->store(true); my_stale->store(true);
break; break;
case Coordination::CHANGED: case Coordination::CHANGED:
LOG_DEBUG(logger, "CleanStateClock change: CHANGED, at {}", rsp.path); LOG_DEBUG(logger, "CleanStateClock change: CHANGED, at {}", rsp.path);
stale->store(true); my_stale->store(true);
} }
} }
}; };

View File

@ -498,18 +498,18 @@ try
/// Prometheus (if defined and not setup yet with http_port) /// Prometheus (if defined and not setup yet with http_port)
port_name = "prometheus.port"; port_name = "prometheus.port";
createServer(listen_host, port_name, listen_try, [&, http_context = std::move(http_context)](UInt16 port) mutable createServer(listen_host, port_name, listen_try, [&, my_http_context = std::move(http_context)](UInt16 port) mutable
{ {
Poco::Net::ServerSocket socket; Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port); auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(http_context->getReceiveTimeout()); socket.setReceiveTimeout(my_http_context->getReceiveTimeout());
socket.setSendTimeout(http_context->getSendTimeout()); socket.setSendTimeout(my_http_context->getSendTimeout());
servers->emplace_back( servers->emplace_back(
listen_host, listen_host,
port_name, port_name,
"Prometheus: http://" + address.toString(), "Prometheus: http://" + address.toString(),
std::make_unique<HTTPServer>( std::make_unique<HTTPServer>(
std::move(http_context), createPrometheusMainHandlerFactory(*this, config_getter(), async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params)); std::move(my_http_context), createPrometheusMainHandlerFactory(*this, config_getter(), async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
}); });
} }

View File

@ -47,10 +47,10 @@ scope_guard AccessChangesNotifier::subscribeForChanges(AccessEntityType type, co
list.push_back(handler); list.push_back(handler);
auto handler_it = std::prev(list.end()); auto handler_it = std::prev(list.end());
return [handlers=handlers, type, handler_it] return [my_handlers = handlers, type, handler_it]
{ {
std::lock_guard lock2{handlers->mutex}; std::lock_guard lock2{my_handlers->mutex};
auto & list2 = handlers->by_type[static_cast<size_t>(type)]; auto & list2 = my_handlers->by_type[static_cast<size_t>(type)];
list2.erase(handler_it); list2.erase(handler_it);
}; };
} }
@ -63,13 +63,13 @@ scope_guard AccessChangesNotifier::subscribeForChanges(const UUID & id, const On
list.push_back(handler); list.push_back(handler);
auto handler_it = std::prev(list.end()); auto handler_it = std::prev(list.end());
return [handlers=handlers, it, handler_it] return [my_handlers = handlers, it, handler_it]
{ {
std::lock_guard lock2{handlers->mutex}; std::lock_guard lock2{my_handlers->mutex};
auto & list2 = it->second; auto & list2 = it->second;
list2.erase(handler_it); list2.erase(handler_it);
if (list2.empty()) if (list2.empty())
handlers->by_id.erase(it); my_handlers->by_id.erase(it);
}; };
} }

View File

@ -742,9 +742,9 @@ void DiskAccessStorage::restoreFromBackup(RestorerFromBackup & restorer)
bool replace_if_exists = (create_access == RestoreAccessCreationMode::kReplace); bool replace_if_exists = (create_access == RestoreAccessCreationMode::kReplace);
bool throw_if_exists = (create_access == RestoreAccessCreationMode::kCreate); bool throw_if_exists = (create_access == RestoreAccessCreationMode::kCreate);
restorer.addDataRestoreTask([this, entities = std::move(entities), replace_if_exists, throw_if_exists] restorer.addDataRestoreTask([this, my_entities = std::move(entities), replace_if_exists, throw_if_exists]
{ {
for (const auto & [id, entity] : entities) for (const auto & [id, entity] : my_entities)
insertWithID(id, entity, replace_if_exists, throw_if_exists, /* write_on_disk= */ true); insertWithID(id, entity, replace_if_exists, throw_if_exists, /* write_on_disk= */ true);
}); });
} }

View File

@ -26,10 +26,10 @@ scope_guard EnabledRoles::subscribeForChanges(const OnChangeHandler & handler) c
handlers->list.push_back(handler); handlers->list.push_back(handler);
auto it = std::prev(handlers->list.end()); auto it = std::prev(handlers->list.end());
return [handlers=handlers, it] return [my_handlers = handlers, it]
{ {
std::lock_guard lock2{handlers->mutex}; std::lock_guard lock2{my_handlers->mutex};
handlers->list.erase(it); my_handlers->list.erase(it);
}; };
} }
@ -53,10 +53,10 @@ void EnabledRoles::setRolesInfo(const std::shared_ptr<const EnabledRolesInfo> &
} }
notifications->join(scope_guard( notifications->join(scope_guard(
[info = info, handlers_to_notify = std::move(handlers_to_notify)] [my_info = info, my_handlers_to_notify = std::move(handlers_to_notify)]
{ {
for (const auto & handler : handlers_to_notify) for (const auto & handler : my_handlers_to_notify)
handler(info); handler(my_info);
})); }));
} }
} }

View File

@ -297,9 +297,9 @@ void MemoryAccessStorage::restoreFromBackup(RestorerFromBackup & restorer)
bool replace_if_exists = (create_access == RestoreAccessCreationMode::kReplace); bool replace_if_exists = (create_access == RestoreAccessCreationMode::kReplace);
bool throw_if_exists = (create_access == RestoreAccessCreationMode::kCreate); bool throw_if_exists = (create_access == RestoreAccessCreationMode::kCreate);
restorer.addDataRestoreTask([this, entities = std::move(entities), replace_if_exists, throw_if_exists] restorer.addDataRestoreTask([this, my_entities = std::move(entities), replace_if_exists, throw_if_exists]
{ {
for (const auto & [id, entity] : entities) for (const auto & [id, entity] : my_entities)
insertWithID(id, entity, replace_if_exists, throw_if_exists); insertWithID(id, entity, replace_if_exists, throw_if_exists);
}); });
} }

View File

@ -525,9 +525,9 @@ void ReplicatedAccessStorage::refreshEntities(const zkutil::ZooKeeperPtr & zooke
} }
const String zookeeper_uuids_path = zookeeper_path + "/uuid"; const String zookeeper_uuids_path = zookeeper_path + "/uuid";
auto watch_entities_list = [watched_queue = watched_queue](const Coordination::WatchResponse &) auto watch_entities_list = [my_watched_queue = watched_queue](const Coordination::WatchResponse &)
{ {
[[maybe_unused]] bool push_result = watched_queue->push(UUIDHelpers::Nil); [[maybe_unused]] bool push_result = my_watched_queue->push(UUIDHelpers::Nil);
}; };
Coordination::Stat stat; Coordination::Stat stat;
const auto entity_uuid_strs = zookeeper->getChildrenWatch(zookeeper_uuids_path, &stat, watch_entities_list); const auto entity_uuid_strs = zookeeper->getChildrenWatch(zookeeper_uuids_path, &stat, watch_entities_list);
@ -592,10 +592,10 @@ void ReplicatedAccessStorage::refreshEntityNoLock(const zkutil::ZooKeeperPtr & z
AccessEntityPtr ReplicatedAccessStorage::tryReadEntityFromZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id) const AccessEntityPtr ReplicatedAccessStorage::tryReadEntityFromZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id) const
{ {
const auto watch_entity = [watched_queue = watched_queue, id](const Coordination::WatchResponse & response) const auto watch_entity = [my_watched_queue = watched_queue, id](const Coordination::WatchResponse & response)
{ {
if (response.type == Coordination::Event::CHANGED) if (response.type == Coordination::Event::CHANGED)
[[maybe_unused]] bool push_result = watched_queue->push(id); [[maybe_unused]] bool push_result = my_watched_queue->push(id);
}; };
Coordination::Stat entity_stat; Coordination::Stat entity_stat;
@ -680,12 +680,12 @@ void ReplicatedAccessStorage::backup(BackupEntriesCollector & backup_entries_col
backup_entries_collector.addPostTask( backup_entries_collector.addPostTask(
[backup_entry = backup_entry_with_path.second, [backup_entry = backup_entry_with_path.second,
zookeeper_path = zookeeper_path, my_zookeeper_path = zookeeper_path,
type, type,
&backup_entries_collector, &backup_entries_collector,
backup_coordination] backup_coordination]
{ {
for (const String & path : backup_coordination->getReplicatedAccessFilePaths(zookeeper_path, type)) for (const String & path : backup_coordination->getReplicatedAccessFilePaths(my_zookeeper_path, type))
backup_entries_collector.addBackupEntry(path, backup_entry); backup_entries_collector.addBackupEntry(path, backup_entry);
}); });
} }
@ -708,9 +708,9 @@ void ReplicatedAccessStorage::restoreFromBackup(RestorerFromBackup & restorer)
bool replace_if_exists = (create_access == RestoreAccessCreationMode::kReplace); bool replace_if_exists = (create_access == RestoreAccessCreationMode::kReplace);
bool throw_if_exists = (create_access == RestoreAccessCreationMode::kCreate); bool throw_if_exists = (create_access == RestoreAccessCreationMode::kCreate);
restorer.addDataRestoreTask([this, entities = std::move(entities), replace_if_exists, throw_if_exists] restorer.addDataRestoreTask([this, my_entities = std::move(entities), replace_if_exists, throw_if_exists]
{ {
for (const auto & [id, entity] : entities) for (const auto & [id, entity] : my_entities)
insertWithID(id, entity, replace_if_exists, throw_if_exists); insertWithID(id, entity, replace_if_exists, throw_if_exists);
}); });
} }

View File

@ -173,13 +173,13 @@ BackupCoordinationRemote::BackupCoordinationRemote(
log, log,
get_zookeeper_, get_zookeeper_,
keeper_settings, keeper_settings,
[zookeeper_path = zookeeper_path, current_host = current_host, is_internal = is_internal] [my_zookeeper_path = zookeeper_path, my_current_host = current_host, my_is_internal = is_internal]
(WithRetries::FaultyKeeper & zk) (WithRetries::FaultyKeeper & zk)
{ {
/// Recreate this ephemeral node to signal that we are alive. /// Recreate this ephemeral node to signal that we are alive.
if (is_internal) if (my_is_internal)
{ {
String alive_node_path = zookeeper_path + "/stage/alive|" + current_host; String alive_node_path = my_zookeeper_path + "/stage/alive|" + my_current_host;
auto code = zk->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral); auto code = zk->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNODEEXISTS) if (code == Coordination::Error::ZNODEEXISTS)

View File

@ -470,17 +470,17 @@ std::vector<std::pair<ASTPtr, StoragePtr>> BackupEntriesCollector::findTablesInD
const auto & database_info = database_infos.at(database_name); const auto & database_info = database_infos.at(database_name);
const auto & database = database_info.database; const auto & database = database_info.database;
auto filter_by_table_name = [database_info = &database_info](const String & table_name) auto filter_by_table_name = [my_database_info = &database_info](const String & table_name)
{ {
/// We skip inner tables of materialized views. /// We skip inner tables of materialized views.
if (table_name.starts_with(".inner_id.")) if (table_name.starts_with(".inner_id."))
return false; return false;
if (database_info->tables.contains(table_name)) if (my_database_info->tables.contains(table_name))
return true; return true;
if (database_info->all_tables) if (my_database_info->all_tables)
return !database_info->except_table_names.contains(table_name); return !my_database_info->except_table_names.contains(table_name);
return false; return false;
}; };

View File

@ -208,7 +208,7 @@ void BackupImpl::openArchive()
if (!reader->fileExists(archive_name)) if (!reader->fileExists(archive_name))
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", backup_name_for_logging); throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", backup_name_for_logging);
size_t archive_size = reader->getFileSize(archive_name); size_t archive_size = reader->getFileSize(archive_name);
archive_reader = createArchiveReader(archive_name, [reader=reader, archive_name]{ return reader->readFile(archive_name); }, archive_size); archive_reader = createArchiveReader(archive_name, [my_reader = reader, archive_name]{ return my_reader->readFile(archive_name); }, archive_size);
archive_reader->setPassword(archive_params.password); archive_reader->setPassword(archive_params.password);
} }
else else

View File

@ -34,13 +34,13 @@ RestoreCoordinationRemote::RestoreCoordinationRemote(
log, log,
get_zookeeper_, get_zookeeper_,
keeper_settings, keeper_settings,
[zookeeper_path = zookeeper_path, current_host = current_host, is_internal = is_internal] [my_zookeeper_path = zookeeper_path, my_current_host = current_host, my_is_internal = is_internal]
(WithRetries::FaultyKeeper & zk) (WithRetries::FaultyKeeper & zk)
{ {
/// Recreate this ephemeral node to signal that we are alive. /// Recreate this ephemeral node to signal that we are alive.
if (is_internal) if (my_is_internal)
{ {
String alive_node_path = zookeeper_path + "/stage/alive|" + current_host; String alive_node_path = my_zookeeper_path + "/stage/alive|" + my_current_host;
auto code = zk->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral); auto code = zk->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNODEEXISTS) if (code == Coordination::Error::ZNODEEXISTS)

View File

@ -108,14 +108,14 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti
template <typename ConnectionType> template <typename ConnectionType>
void Suggest::load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit) void Suggest::load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit)
{ {
loading_thread = std::thread([context=Context::createCopy(context), connection_parameters, suggestion_limit, this] loading_thread = std::thread([my_context = Context::createCopy(context), connection_parameters, suggestion_limit, this]
{ {
ThreadStatus thread_status; ThreadStatus thread_status;
for (size_t retry = 0; retry < 10; ++retry) for (size_t retry = 0; retry < 10; ++retry)
{ {
try try
{ {
auto connection = ConnectionType::createConnection(connection_parameters, context); auto connection = ConnectionType::createConnection(connection_parameters, my_context);
fetch(*connection, connection_parameters.timeouts, getLoadSuggestionQuery(suggestion_limit, std::is_same_v<ConnectionType, LocalConnection>)); fetch(*connection, connection_parameters.timeouts, getLoadSuggestionQuery(suggestion_limit, std::is_same_v<ConnectionType, LocalConnection>));
} }
catch (const Exception & e) catch (const Exception & e)

View File

@ -942,9 +942,9 @@ ColumnPtr ColumnArray::compress() const
size_t byte_size = data_compressed->byteSize() + offsets_compressed->byteSize(); size_t byte_size = data_compressed->byteSize() + offsets_compressed->byteSize();
return ColumnCompressed::create(size(), byte_size, return ColumnCompressed::create(size(), byte_size,
[data_compressed = std::move(data_compressed), offsets_compressed = std::move(offsets_compressed)] [my_data_compressed = std::move(data_compressed), my_offsets_compressed = std::move(offsets_compressed)]
{ {
return ColumnArray::create(data_compressed->decompress(), offsets_compressed->decompress()); return ColumnArray::create(my_data_compressed->decompress(), my_offsets_compressed->decompress());
}); });
} }

View File

@ -64,7 +64,7 @@ public:
return ColumnCompressed::create( return ColumnCompressed::create(
size, size,
bytes, bytes,
[column = std::move(column)]{ return column; }); [my_column = std::move(column)]{ return my_column; });
} }
/// Helper methods for compression. /// Helper methods for compression.

View File

@ -386,11 +386,11 @@ ColumnPtr ColumnDecimal<T>::compress() const
const size_t compressed_size = compressed->size(); const size_t compressed_size = compressed->size();
return ColumnCompressed::create(data_size, compressed_size, return ColumnCompressed::create(data_size, compressed_size,
[compressed = std::move(compressed), column_size = data_size, scale = this->scale] [my_compressed = std::move(compressed), column_size = data_size, my_scale = this->scale]
{ {
auto res = ColumnDecimal<T>::create(column_size, scale); auto res = ColumnDecimal<T>::create(column_size, my_scale);
ColumnCompressed::decompressBuffer( ColumnCompressed::decompressBuffer(
compressed->data(), res->getData().data(), compressed->size(), column_size * sizeof(T)); my_compressed->data(), res->getData().data(), my_compressed->size(), column_size * sizeof(T));
return res; return res;
}); });
} }

View File

@ -393,13 +393,13 @@ ColumnPtr ColumnFixedString::compress() const
const size_t column_size = size(); const size_t column_size = size();
const size_t compressed_size = compressed->size(); const size_t compressed_size = compressed->size();
return ColumnCompressed::create(column_size, compressed_size, return ColumnCompressed::create(column_size, compressed_size,
[compressed = std::move(compressed), column_size, n = n] [my_compressed = std::move(compressed), column_size, my_n = n]
{ {
size_t chars_size = n * column_size; size_t chars_size = my_n * column_size;
auto res = ColumnFixedString::create(n); auto res = ColumnFixedString::create(my_n);
res->getChars().resize(chars_size); res->getChars().resize(chars_size);
ColumnCompressed::decompressBuffer( ColumnCompressed::decompressBuffer(
compressed->data(), res->getChars().data(), compressed->size(), chars_size); my_compressed->data(), res->getChars().data(), my_compressed->size(), chars_size);
return res; return res;
}); });
} }

View File

@ -312,9 +312,9 @@ ColumnPtr ColumnMap::compress() const
const auto byte_size = compressed->byteSize(); const auto byte_size = compressed->byteSize();
/// The order of evaluation of function arguments is unspecified /// The order of evaluation of function arguments is unspecified
/// and could cause interacting with object in moved-from state /// and could cause interacting with object in moved-from state
return ColumnCompressed::create(size(), byte_size, [compressed = std::move(compressed)] return ColumnCompressed::create(size(), byte_size, [my_compressed = std::move(compressed)]
{ {
return ColumnMap::create(compressed->decompress()); return ColumnMap::create(my_compressed->decompress());
}); });
} }

View File

@ -644,9 +644,9 @@ ColumnPtr ColumnNullable::compress() const
size_t byte_size = nested_column->byteSize() + null_map->byteSize(); size_t byte_size = nested_column->byteSize() + null_map->byteSize();
return ColumnCompressed::create(size(), byte_size, return ColumnCompressed::create(size(), byte_size,
[nested_column = std::move(nested_compressed), null_map = std::move(null_map_compressed)] [my_nested_column = std::move(nested_compressed), my_null_map = std::move(null_map_compressed)]
{ {
return ColumnNullable::create(nested_column->decompress(), null_map->decompress()); return ColumnNullable::create(my_nested_column->decompress(), my_null_map->decompress());
}); });
} }

View File

@ -738,9 +738,9 @@ ColumnPtr ColumnSparse::compress() const
size_t byte_size = values_compressed->byteSize() + offsets_compressed->byteSize(); size_t byte_size = values_compressed->byteSize() + offsets_compressed->byteSize();
return ColumnCompressed::create(size(), byte_size, return ColumnCompressed::create(size(), byte_size,
[values_compressed = std::move(values_compressed), offsets_compressed = std::move(offsets_compressed), size = size()] [my_values_compressed = std::move(values_compressed), my_offsets_compressed = std::move(offsets_compressed), size = size()]
{ {
return ColumnSparse::create(values_compressed->decompress(), offsets_compressed->decompress(), size); return ColumnSparse::create(my_values_compressed->decompress(), my_offsets_compressed->decompress(), size);
}); });
} }

View File

@ -532,8 +532,8 @@ ColumnPtr ColumnString::compress() const
const size_t offsets_compressed_size = offsets_compressed->size(); const size_t offsets_compressed_size = offsets_compressed->size();
return ColumnCompressed::create(source_offsets_elements, chars_compressed_size + offsets_compressed_size, return ColumnCompressed::create(source_offsets_elements, chars_compressed_size + offsets_compressed_size,
[ [
chars_compressed = std::move(chars_compressed), my_chars_compressed = std::move(chars_compressed),
offsets_compressed = std::move(offsets_compressed), my_offsets_compressed = std::move(offsets_compressed),
source_chars_size, source_chars_size,
source_offsets_elements source_offsets_elements
] ]
@ -544,10 +544,10 @@ ColumnPtr ColumnString::compress() const
res->getOffsets().resize(source_offsets_elements); res->getOffsets().resize(source_offsets_elements);
ColumnCompressed::decompressBuffer( ColumnCompressed::decompressBuffer(
chars_compressed->data(), res->getChars().data(), chars_compressed->size(), source_chars_size); my_chars_compressed->data(), res->getChars().data(), my_chars_compressed->size(), source_chars_size);
ColumnCompressed::decompressBuffer( ColumnCompressed::decompressBuffer(
offsets_compressed->data(), res->getOffsets().data(), offsets_compressed->size(), source_offsets_elements * sizeof(Offset)); my_offsets_compressed->data(), res->getOffsets().data(), my_offsets_compressed->size(), source_offsets_elements * sizeof(Offset));
return res; return res;
}); });

View File

@ -552,11 +552,11 @@ ColumnPtr ColumnTuple::compress() const
} }
return ColumnCompressed::create(size(), byte_size, return ColumnCompressed::create(size(), byte_size,
[compressed = std::move(compressed)]() mutable [my_compressed = std::move(compressed)]() mutable
{ {
for (auto & column : compressed) for (auto & column : my_compressed)
column = column->decompress(); column = column->decompress();
return ColumnTuple::create(compressed); return ColumnTuple::create(my_compressed);
}); });
} }

View File

@ -927,11 +927,11 @@ ColumnPtr ColumnVector<T>::compress() const
const size_t compressed_size = compressed->size(); const size_t compressed_size = compressed->size();
return ColumnCompressed::create(data_size, compressed_size, return ColumnCompressed::create(data_size, compressed_size,
[compressed = std::move(compressed), column_size = data_size] [my_compressed = std::move(compressed), column_size = data_size]
{ {
auto res = ColumnVector<T>::create(column_size); auto res = ColumnVector<T>::create(column_size);
ColumnCompressed::decompressBuffer( ColumnCompressed::decompressBuffer(
compressed->data(), res->getData().data(), compressed->size(), column_size * sizeof(T)); my_compressed->data(), res->getData().data(), my_compressed->size(), column_size * sizeof(T));
return res; return res;
}); });
} }

View File

@ -206,21 +206,21 @@ public:
/// - If this will throw an exception, the destructor won't be called /// - If this will throw an exception, the destructor won't be called
/// - this pointer cannot be passed in the lambda, since after detach() it will not be valid /// - this pointer cannot be passed in the lambda, since after detach() it will not be valid
GlobalThreadPool::instance().scheduleOrThrow([ GlobalThreadPool::instance().scheduleOrThrow([
state = state, my_state = state,
func = std::forward<Function>(func), my_func = std::forward<Function>(func),
args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture my_args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture
{ {
SCOPE_EXIT( SCOPE_EXIT(
state->thread_id = std::thread::id(); my_state->thread_id = std::thread::id();
state->event.set(); my_state->event.set();
); );
state->thread_id = std::this_thread::get_id(); my_state->thread_id = std::this_thread::get_id();
/// This moves are needed to destroy function and arguments before exit. /// This moves are needed to destroy function and arguments before exit.
/// It will guarantee that after ThreadFromGlobalPool::join all captured params are destroyed. /// It will guarantee that after ThreadFromGlobalPool::join all captured params are destroyed.
auto function = std::move(func); auto function = std::move(my_func);
auto arguments = std::move(args); auto arguments = std::move(my_args);
/// Thread status holds raw pointer on query context, thus it always must be destroyed /// Thread status holds raw pointer on query context, thus it always must be destroyed
/// before sending signal that permits to join this thread. /// before sending signal that permits to join this thread.

View File

@ -234,12 +234,12 @@ TEST(ConcurrencyControl, MultipleThreads)
while (auto slot = slots->tryAcquire()) while (auto slot = slots->tryAcquire())
{ {
std::unique_lock lock{threads_mutex}; std::unique_lock lock{threads_mutex};
threads.emplace_back([&, slot = std::move(slot)] threads.emplace_back([&, my_slot = std::move(slot)]
{ {
pcg64 rng(randomSeed()); pcg64 rng(randomSeed());
std::uniform_int_distribution<size_t> distribution(1, cfg_work_us); std::uniform_int_distribution<size_t> distribution(1, cfg_work_us);
size_t steps = distribution(rng); size_t steps = distribution(rng);
for (size_t step = 0; step < steps; step++) for (size_t step = 0; step < steps; ++step)
{ {
sleepForMicroseconds(distribution(rng)); // emulate work sleepForMicroseconds(distribution(rng)); // emulate work
spawn_threads(); // upscale spawn_threads(); // upscale

View File

@ -471,9 +471,9 @@ void KeeperDispatcher::shutdown()
const auto raft_result = server->putRequestBatch(close_requests); const auto raft_result = server->putRequestBatch(close_requests);
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>(); auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
auto sessions_closing_done = sessions_closing_done_promise->get_future(); auto sessions_closing_done = sessions_closing_done_promise->get_future();
raft_result->when_ready([sessions_closing_done_promise = std::move(sessions_closing_done_promise)]( raft_result->when_ready([my_sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/, nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
nuraft::ptr<std::exception> & /*exception*/) { sessions_closing_done_promise->set_value(); }); nuraft::ptr<std::exception> & /*exception*/) { my_sessions_closing_done_promise->set_value(); });
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds(); auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready) if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)

View File

@ -342,34 +342,34 @@ void KeeperStorage::UncommittedState::applyDelta(const Delta & delta)
auto & [node, acls, last_applied_zxid] = nodes.at(delta.path); auto & [node, acls, last_applied_zxid] = nodes.at(delta.path);
std::visit( std::visit(
[&, &node = node, &acls = acls, &last_applied_zxid = last_applied_zxid]<typename DeltaType>(const DeltaType & operation) [&, &my_node = node, &my_acls = acls, &my_last_applied_zxid = last_applied_zxid]<typename DeltaType>(const DeltaType & operation)
{ {
if constexpr (std::same_as<DeltaType, CreateNodeDelta>) if constexpr (std::same_as<DeltaType, CreateNodeDelta>)
{ {
assert(!node); assert(!my_node);
node = std::make_shared<Node>(); my_node = std::make_shared<Node>();
node->stat = operation.stat; my_node->stat = operation.stat;
node->setData(operation.data); my_node->setData(operation.data);
acls = operation.acls; my_acls = operation.acls;
last_applied_zxid = delta.zxid; my_last_applied_zxid = delta.zxid;
} }
else if constexpr (std::same_as<DeltaType, RemoveNodeDelta>) else if constexpr (std::same_as<DeltaType, RemoveNodeDelta>)
{ {
assert(node); assert(my_node);
node = nullptr; my_node = nullptr;
last_applied_zxid = delta.zxid; my_last_applied_zxid = delta.zxid;
} }
else if constexpr (std::same_as<DeltaType, UpdateNodeDelta>) else if constexpr (std::same_as<DeltaType, UpdateNodeDelta>)
{ {
assert(node); assert(my_node);
node->invalidateDigestCache(); my_node->invalidateDigestCache();
operation.update_fn(*node); operation.update_fn(*node);
last_applied_zxid = delta.zxid; my_last_applied_zxid = delta.zxid;
} }
else if constexpr (std::same_as<DeltaType, SetACLDelta>) else if constexpr (std::same_as<DeltaType, SetACLDelta>)
{ {
acls = operation.acls; my_acls = operation.acls;
last_applied_zxid = delta.zxid; my_last_applied_zxid = delta.zxid;
} }
}, },
delta.operation); delta.operation);

View File

@ -140,7 +140,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
if (itr.key != "/") if (itr.key != "/")
{ {
auto parent_path = parentPath(itr.key); auto parent_path = parentPath(itr.key);
storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(path)); value.stat.numChildren++; }); storage.container.updateValue(parent_path, [my_path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(my_path)); ++value.stat.numChildren; });
} }
} }

View File

@ -80,10 +80,10 @@ struct DictionaryAttributeType
template <typename F> template <typename F>
constexpr void callOnDictionaryAttributeType(AttributeUnderlyingType type, F && func) constexpr void callOnDictionaryAttributeType(AttributeUnderlyingType type, F && func)
{ {
static_for<AttributeUnderlyingType>([type, func = std::forward<F>(func)](auto other) static_for<AttributeUnderlyingType>([type, my_func = std::forward<F>(func)](auto other)
{ {
if (type == other) if (type == other)
func(DictionaryAttributeType<other>{}); my_func(DictionaryAttributeType<other>{});
}); });
} }

View File

@ -3005,10 +3005,10 @@ namespace
bool google_wrappers_special_treatment) bool google_wrappers_special_treatment)
{ {
root_serializer_ptr = std::make_shared<ProtobufSerializer *>(); root_serializer_ptr = std::make_shared<ProtobufSerializer *>();
get_root_desc_function = [root_serializer_ptr = root_serializer_ptr](size_t indent) -> String get_root_desc_function = [my_root_serializer_ptr = root_serializer_ptr](size_t indent) -> String
{ {
WriteBufferFromOwnString buf; WriteBufferFromOwnString buf;
(*root_serializer_ptr)->describeTree(buf, indent); (*my_root_serializer_ptr)->describeTree(buf, indent);
return buf.str(); return buf.str();
}; };

View File

@ -56,18 +56,18 @@ void backupUserDefinedSQLObjects(
// They will only be returned for one of the hosts below, for the rest an empty list. // They will only be returned for one of the hosts below, for the rest an empty list.
// See also BackupCoordinationReplicatedSQLObjects class. // See also BackupCoordinationReplicatedSQLObjects class.
backup_entries_collector.addPostTask( backup_entries_collector.addPostTask(
[backup_entries = std::move(backup_entries), [my_backup_entries = std::move(backup_entries),
replication_id = std::move(replication_id), my_replication_id = std::move(replication_id),
object_type, object_type,
&backup_entries_collector, &backup_entries_collector,
backup_coordination] backup_coordination]
{ {
auto dirs = backup_coordination->getReplicatedSQLObjectsDirs(replication_id, object_type); auto dirs = backup_coordination->getReplicatedSQLObjectsDirs(my_replication_id, object_type);
for (const auto & dir : dirs) for (const auto & dir : dirs)
{ {
fs::path dir_fs{dir}; fs::path dir_fs{dir};
for (const auto & [file_name, entry] : backup_entries) for (const auto & [file_name, entry] : my_backup_entries)
{ {
backup_entries_collector.addBackupEntry(dir_fs / file_name, entry); backup_entries_collector.addBackupEntry(dir_fs / file_name, entry);
} }

View File

@ -283,11 +283,11 @@ bool UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectDataAndSetWatch(
UserDefinedSQLObjectType object_type, UserDefinedSQLObjectType object_type,
const String & object_name) const String & object_name)
{ {
const auto object_watcher = [watch_queue = watch_queue, object_type, object_name](const Coordination::WatchResponse & response) const auto object_watcher = [my_watch_queue = watch_queue, object_type, object_name](const Coordination::WatchResponse & response)
{ {
if (response.type == Coordination::Event::CHANGED) if (response.type == Coordination::Event::CHANGED)
{ {
[[maybe_unused]] bool inserted = watch_queue->emplace(object_type, object_name); [[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, object_name);
/// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called). /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called).
} }
/// Event::DELETED is processed as child event by getChildren watch /// Event::DELETED is processed as child event by getChildren watch
@ -346,9 +346,9 @@ ASTPtr UserDefinedSQLObjectsLoaderFromZooKeeper::tryLoadObject(
Strings UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectNamesAndSetWatch( Strings UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectNamesAndSetWatch(
const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type) const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
{ {
auto object_list_watcher = [watch_queue = watch_queue, object_type](const Coordination::WatchResponse &) auto object_list_watcher = [my_watch_queue = watch_queue, object_type](const Coordination::WatchResponse &)
{ {
[[maybe_unused]] bool inserted = watch_queue->emplace(object_type, ""); [[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, "");
/// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called). /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called).
}; };

View File

@ -87,7 +87,7 @@ bool ParallelReadBuffer::addReaderToPool()
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader), range_start, size)); auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader), range_start, size));
++active_working_reader; ++active_working_reader;
schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }, 0); schedule([this, my_worker = std::move(worker)]() mutable { readerThreadFunction(std::move(my_worker)); }, 0);
return true; return true;
} }

View File

@ -190,9 +190,9 @@ void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key,
{ {
/// Wrap 'unique_ptr' with 'shared_ptr' to make this /// Wrap 'unique_ptr' with 'shared_ptr' to make this
/// lambda copyable and allow to save it to the thread pool. /// lambda copyable and allow to save it to the thread pool.
pool.scheduleOrThrowOnError([key, global_context, data = std::make_shared<InsertDataPtr>(std::move(data))]() mutable pool.scheduleOrThrowOnError([key, global_context, my_data = std::make_shared<InsertDataPtr>(std::move(data))]() mutable
{ {
processData(key, std::move(*data), std::move(global_context)); processData(key, std::move(*my_data), std::move(global_context));
}); });
} }

View File

@ -149,7 +149,7 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk,
int * version, int * version,
bool set_callback) bool set_callback)
{ {
auto watch_callback = [cluster_name, clusters_to_update=clusters_to_update](auto) { clusters_to_update->set(cluster_name); }; auto watch_callback = [cluster_name, my_clusters_to_update = clusters_to_update](auto) { my_clusters_to_update->set(cluster_name); };
Coordination::Stat stat; Coordination::Stat stat;
Strings nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? watch_callback : Coordination::WatchCallback{}); Strings nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? watch_callback : Coordination::WatchCallback{});

View File

@ -922,7 +922,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto finish_callback = [elem, auto finish_callback = [elem,
context, context,
ast, ast,
can_use_query_cache = can_use_query_cache, my_can_use_query_cache = can_use_query_cache,
enable_writes_to_query_cache = settings.enable_writes_to_query_cache, enable_writes_to_query_cache = settings.enable_writes_to_query_cache,
query_cache_store_results_of_queries_with_nondeterministic_functions = settings.query_cache_store_results_of_queries_with_nondeterministic_functions, query_cache_store_results_of_queries_with_nondeterministic_functions = settings.query_cache_store_results_of_queries_with_nondeterministic_functions,
log_queries, log_queries,
@ -940,7 +940,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto query_cache = context->getQueryCache(); auto query_cache = context->getQueryCache();
if (query_cache != nullptr if (query_cache != nullptr
&& pulling_pipeline && pulling_pipeline
&& can_use_query_cache && enable_writes_to_query_cache && my_can_use_query_cache && enable_writes_to_query_cache
&& (!astContainsNonDeterministicFunctions(ast, context) || query_cache_store_results_of_queries_with_nondeterministic_functions)) && (!astContainsNonDeterministicFunctions(ast, context) || query_cache_store_results_of_queries_with_nondeterministic_functions))
{ {
query_pipeline.finalizeWriteInQueryCache(); query_pipeline.finalizeWriteInQueryCache();
@ -1071,7 +1071,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
log_queries, log_queries,
log_queries_min_type = settings.log_queries_min_type, log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(), log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
quota(quota), my_quota(quota),
status_info_to_query_log, status_info_to_query_log,
implicit_txn_control, implicit_txn_control,
execute_implicit_tcl_query, execute_implicit_tcl_query,
@ -1082,8 +1082,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
else if (auto txn = context->getCurrentTransaction()) else if (auto txn = context->getCurrentTransaction())
txn->onException(); txn->onException();
if (quota) if (my_quota)
quota->used(QuotaType::ERRORS, 1, /* check_exceeded = */ false); my_quota->used(QuotaType::ERRORS, 1, /* check_exceeded = */ false);
elem.type = QueryLogElementType::EXCEPTION_WHILE_PROCESSING; elem.type = QueryLogElementType::EXCEPTION_WHILE_PROCESSING;
elem.exception_code = getCurrentExceptionCode(); elem.exception_code = getCurrentExceptionCode();

View File

@ -17,9 +17,9 @@ using ThreadPoolCallbackRunner = std::function<std::future<Result>(Callback &&,
template <typename Result, typename Callback = std::function<Result()>> template <typename Result, typename Callback = std::function<Result()>>
ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name) ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name)
{ {
return [pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](Callback && callback, int64_t priority) mutable -> std::future<Result> return [my_pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](Callback && callback, int64_t priority) mutable -> std::future<Result>
{ {
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, callback = std::move(callback)]() mutable -> Result auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, my_callback = std::move(callback)]() mutable -> Result
{ {
if (thread_group) if (thread_group)
CurrentThread::attachToGroup(thread_group); CurrentThread::attachToGroup(thread_group);
@ -29,7 +29,7 @@ ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool &
/// Release all captutred resources before detaching thread group /// Release all captutred resources before detaching thread group
/// Releasing has to use proper memory tracker which has been set here before callback /// Releasing has to use proper memory tracker which has been set here before callback
[[maybe_unused]] auto tmp = std::move(callback); [[maybe_unused]] auto tmp = std::move(my_callback);
} }
if (thread_group) if (thread_group)
@ -39,13 +39,13 @@ ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool &
setThreadName(thread_name.data()); setThreadName(thread_name.data());
return callback(); return my_callback();
}); });
auto future = task->get_future(); auto future = task->get_future();
/// ThreadPool is using "bigger is higher priority" instead of "smaller is more priority". /// ThreadPool is using "bigger is higher priority" instead of "smaller is more priority".
pool->scheduleOrThrow([task = std::move(task)]{ (*task)(); }, -priority); my_pool->scheduleOrThrow([my_task = std::move(task)]{ (*my_task)(); }, -priority);
return future; return future;
}; };

View File

@ -327,7 +327,7 @@ void PipelineExecutor::spawnThreads()
tasks.upscale(thread_num + 1); tasks.upscale(thread_num + 1);
/// Start new thread /// Start new thread
pool->scheduleOrThrowOnError([this, thread_num, thread_group = CurrentThread::getGroup(), slot = std::move(slot)] pool->scheduleOrThrowOnError([this, thread_num, thread_group = CurrentThread::getGroup(), my_slot = std::move(slot)]
{ {
SCOPE_EXIT_SAFE( SCOPE_EXIT_SAFE(
if (thread_group) if (thread_group)

View File

@ -669,10 +669,10 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
/// Let's split ranges to avoid reading much data. /// Let's split ranges to avoid reading much data.
auto split_ranges auto split_ranges
= [rows_granularity = data_settings->index_granularity, max_block_size = max_block_size](const auto & ranges, int direction) = [rows_granularity = data_settings->index_granularity, my_max_block_size = max_block_size](const auto & ranges, int direction)
{ {
MarkRanges new_ranges; MarkRanges new_ranges;
const size_t max_marks_in_range = (max_block_size + rows_granularity - 1) / rows_granularity; const size_t max_marks_in_range = (my_max_block_size + rows_granularity - 1) / rows_granularity;
size_t marks_in_range = 1; size_t marks_in_range = 1;
if (direction == 1) if (direction == 1)

View File

@ -146,31 +146,31 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
} }
auto lazily_create_stream = [ auto lazily_create_stream = [
shard = shard, shard_count = shard_count, query = shard.query, header = shard.header, my_shard = shard, my_shard_count = shard_count, query = shard.query, header = shard.header,
context = context, throttler = throttler, my_context = context, my_throttler = throttler,
main_table = main_table, table_func_ptr = table_func_ptr, my_main_table = main_table, my_table_func_ptr = table_func_ptr,
scalars = scalars, external_tables = external_tables, my_scalars = scalars, my_external_tables = external_tables,
stage = stage, local_delay = shard.local_delay, my_stage = stage, local_delay = shard.local_delay,
add_agg_info, add_totals, add_extremes, async_read, async_query_sending]() mutable add_agg_info, add_totals, add_extremes, async_read, async_query_sending]() mutable
-> QueryPipelineBuilder -> QueryPipelineBuilder
{ {
auto current_settings = context->getSettingsRef(); auto current_settings = my_context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover( auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
current_settings).getSaturated( current_settings).getSaturated(
current_settings.max_execution_time); current_settings.max_execution_time);
std::vector<ConnectionPoolWithFailover::TryResult> try_results; std::vector<ConnectionPoolWithFailover::TryResult> try_results;
try try
{ {
if (table_func_ptr) if (my_table_func_ptr)
try_results = shard.shard_info.pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY); try_results = my_shard.shard_info.pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
else else
try_results = shard.shard_info.pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, main_table.getQualifiedName()); try_results = my_shard.shard_info.pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, my_main_table.getQualifiedName());
} }
catch (const Exception & ex) catch (const Exception & ex)
{ {
if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED) if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"), LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"),
"Connections to remote replicas of local shard {} failed, will use stale local replica", shard.shard_info.shard_num); "Connections to remote replicas of local shard {} failed, will use stale local replica", my_shard.shard_info.shard_num);
else else
throw; throw;
} }
@ -185,11 +185,11 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
if (try_results.empty() || local_delay < max_remote_delay) if (try_results.empty() || local_delay < max_remote_delay)
{ {
auto plan = createLocalPlan( auto plan = createLocalPlan(
query, header, context, stage, shard.shard_info.shard_num, shard_count, 0, 0, /*coordinator=*/nullptr); query, header, my_context, my_stage, my_shard.shard_info.shard_num, my_shard_count, 0, 0, /*coordinator=*/nullptr);
return std::move(*plan->buildQueryPipeline( return std::move(*plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), QueryPlanOptimizationSettings::fromContext(my_context),
BuildQueryPipelineSettings::fromContext(context))); BuildQueryPipelineSettings::fromContext(my_context)));
} }
else else
{ {
@ -200,10 +200,10 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
String query_string = formattedAST(query); String query_string = formattedAST(query);
scalars["_shard_num"] my_scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}}; = Block{{DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>( auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage); std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, my_stage);
auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending); auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending);
QueryPipelineBuilder builder; QueryPipelineBuilder builder;

View File

@ -107,7 +107,7 @@ struct ManyAggregatedData
{ {
// variant is moved here and will be destroyed in the destructor of the lambda function. // variant is moved here and will be destroyed in the destructor of the lambda function.
pool->trySchedule( pool->trySchedule(
[variant = std::move(variant), thread_group = CurrentThread::getGroup()]() [my_variant = std::move(variant), thread_group = CurrentThread::getGroup()]()
{ {
SCOPE_EXIT_SAFE( SCOPE_EXIT_SAFE(
if (thread_group) if (thread_group)

View File

@ -665,24 +665,24 @@ void RemoteQueryExecutor::sendExternalTables()
auto data = std::make_unique<ExternalTableData>(); auto data = std::make_unique<ExternalTableData>();
data->table_name = table.first; data->table_name = table.first;
data->creating_pipe_callback = [cur, limits, context = this->context]() data->creating_pipe_callback = [cur, limits, my_context = this->context]()
{ {
SelectQueryInfo query_info; SelectQueryInfo query_info;
auto metadata_snapshot = cur->getInMemoryMetadataPtr(); auto metadata_snapshot = cur->getInMemoryMetadataPtr();
auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot, context); auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot, my_context);
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage( QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
context, QueryProcessingStage::Complete, storage_snapshot, query_info); my_context, QueryProcessingStage::Complete, storage_snapshot, query_info);
QueryPlan plan; QueryPlan plan;
cur->read( cur->read(
plan, plan,
metadata_snapshot->getColumns().getNamesOfPhysical(), metadata_snapshot->getColumns().getNamesOfPhysical(),
storage_snapshot, query_info, context, storage_snapshot, query_info, my_context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1); read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
auto builder = plan.buildQueryPipeline( auto builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), QueryPlanOptimizationSettings::fromContext(my_context),
BuildQueryPipelineSettings::fromContext(context)); BuildQueryPipelineSettings::fromContext(my_context));
builder->resize(1); builder->resize(1);
builder->addTransform(std::make_shared<LimitsCheckingTransform>(builder->getHeader(), limits)); builder->addTransform(std::make_shared<LimitsCheckingTransform>(builder->getHeader(), limits));

View File

@ -800,11 +800,11 @@ void HTTPHandler::processQuery(
if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response")) if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response"))
used_output.out->addHeaderCORS(true); used_output.out->addHeaderCORS(true);
auto append_callback = [context = context] (ProgressCallback callback) auto append_callback = [my_context = context] (ProgressCallback callback)
{ {
auto prev = context->getProgressCallback(); auto prev = my_context->getProgressCallback();
context->setProgressCallback([prev, callback] (const Progress & progress) my_context->setProgressCallback([prev, callback] (const Progress & progress)
{ {
if (prev) if (prev)
prev(progress); prev(progress);

View File

@ -28,12 +28,12 @@ public:
template <typename... TArgs> template <typename... TArgs>
explicit HandlingRuleHTTPHandlerFactory(TArgs &&... args) explicit HandlingRuleHTTPHandlerFactory(TArgs &&... args)
{ {
creator = [args = std::tuple<TArgs...>(std::forward<TArgs>(args) ...)]() creator = [my_args = std::tuple<TArgs...>(std::forward<TArgs>(args) ...)]()
{ {
return std::apply([&](auto && ... endpoint_args) return std::apply([&](auto && ... endpoint_args)
{ {
return std::make_unique<TEndpoint>(std::forward<decltype(endpoint_args)>(endpoint_args)...); return std::make_unique<TEndpoint>(std::forward<decltype(endpoint_args)>(endpoint_args)...);
}, std::move(args)); }, std::move(my_args));
}; };
} }

View File

@ -340,10 +340,10 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
std::atomic<size_t> affected_rows {0}; std::atomic<size_t> affected_rows {0};
auto prev = query_context->getProgressCallback(); auto prev = query_context->getProgressCallback();
query_context->setProgressCallback([&, prev = prev](const Progress & progress) query_context->setProgressCallback([&, my_prev = prev](const Progress & progress)
{ {
if (prev) if (my_prev)
prev(progress); my_prev(progress);
affected_rows += progress.written_rows; affected_rows += progress.written_rows;
}); });

View File

@ -211,12 +211,12 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
if (ctx->getSettingsRef().schema_inference_use_cache_for_hdfs) if (ctx->getSettingsRef().schema_inference_use_cache_for_hdfs)
columns_from_cache = tryGetColumnsFromCache(paths, path_from_uri, last_mod_time, format, ctx); columns_from_cache = tryGetColumnsFromCache(paths, path_from_uri, last_mod_time, format, ctx);
ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer> ReadBufferIterator read_buffer_iterator = [&, my_uri_without_path = uri_without_path, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{ {
if (it == paths.end()) if (it == paths.end())
return nullptr; return nullptr;
auto compression = chooseCompressionMethod(*it, compression_method); auto compression = chooseCompressionMethod(*it, compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef(), ctx->getReadSettings()); auto impl = std::make_unique<ReadBufferFromHDFS>(my_uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef(), ctx->getReadSettings());
const Int64 zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max; const Int64 zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
return wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max)); return wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
}; };

View File

@ -32,17 +32,17 @@ std::vector<String> AsyncBlockIDsCache::getChildren()
auto zookeeper = storage.getZooKeeper(); auto zookeeper = storage.getZooKeeper();
auto watch_callback = [last_time = this->last_updatetime.load() auto watch_callback = [last_time = this->last_updatetime.load()
, update_min_interval = this->update_min_interval , my_update_min_interval = this->update_min_interval
, task = task->shared_from_this()](const Coordination::WatchResponse &) , my_task = task->shared_from_this()](const Coordination::WatchResponse &)
{ {
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
if (now - last_time < update_min_interval) if (now - last_time < my_update_min_interval)
{ {
std::chrono::milliseconds sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(update_min_interval - (now - last_time)); std::chrono::milliseconds sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(my_update_min_interval - (now - last_time));
task->scheduleAfter(sleep_time.count()); my_task->scheduleAfter(sleep_time.count());
} }
else else
task->schedule(); my_task->schedule();
}; };
std::vector<String> children; std::vector<String> children;
Coordination::Stat stat; Coordination::Stat stat;

View File

@ -1955,10 +1955,10 @@ try
outdated_unloaded_data_parts.pop_back(); outdated_unloaded_data_parts.pop_back();
} }
parts_futures.push_back(runner([&, part = part]() parts_futures.push_back(runner([&, my_part = part]()
{ {
auto res = loadDataPartWithRetries( auto res = loadDataPartWithRetries(
part->info, part->name, part->disk, my_part->info, my_part->name, part->disk,
DataPartState::Outdated, data_parts_mutex, loading_parts_initial_backoff_ms, DataPartState::Outdated, data_parts_mutex, loading_parts_initial_backoff_ms,
loading_parts_max_backoff_ms, loading_parts_max_tries); loading_parts_max_backoff_ms, loading_parts_max_tries);
@ -5226,9 +5226,9 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
[storage = std::static_pointer_cast<MergeTreeData>(shared_from_this()), [storage = std::static_pointer_cast<MergeTreeData>(shared_from_this()),
backup, backup,
part_path_in_backup = data_path_in_backup_fs / part_name, part_path_in_backup = data_path_in_backup_fs / part_name,
part_info=*part_info, my_part_info = *part_info,
restored_parts_holder] restored_parts_holder]
{ storage->restorePartFromBackup(restored_parts_holder, part_info, part_path_in_backup); }); { storage->restorePartFromBackup(restored_parts_holder, my_part_info, part_path_in_backup); });
++num_parts; ++num_parts;
} }

View File

@ -104,13 +104,13 @@ std::future<MergeTreeReaderPtr> MergeTreePrefetchedReadPool::createPrefetchedRea
/// and we cannot block either, therefore make prefetch inside the pool and put the future /// and we cannot block either, therefore make prefetch inside the pool and put the future
/// into the read task (MergeTreeReadTask). When a thread calls getTask(), it will wait for /// into the read task (MergeTreeReadTask). When a thread calls getTask(), it will wait for
/// it (if not yet ready) after getting the task. /// it (if not yet ready) after getting the task.
auto task = [=, reader = std::move(reader), context = getContext()]() mutable -> MergeTreeReaderPtr && auto task = [=, my_reader = std::move(reader), context = getContext()]() mutable -> MergeTreeReaderPtr &&
{ {
/// For async read metrics in system.query_log. /// For async read metrics in system.query_log.
PrefetchIncrement watch(context->getAsyncReadCounters()); PrefetchIncrement watch(context->getAsyncReadCounters());
reader->prefetchBeginOfRange(priority); my_reader->prefetchBeginOfRange(priority);
return std::move(reader); return std::move(my_reader);
}; };
return scheduleFromThreadPool<IMergeTreeDataPart::MergeTreeReaderPtr>(std::move(task), prefetch_threadpool, "ReadPrepare", priority); return scheduleFromThreadPool<IMergeTreeDataPart::MergeTreeReaderPtr>(std::move(task), prefetch_threadpool, "ReadPrepare", priority);
} }

View File

@ -989,8 +989,10 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
/// here lambda capture part name, it's ok since we'll not generate new one for this insert, /// here lambda capture part name, it's ok since we'll not generate new one for this insert,
/// see comments around 'part_committed_locally_but_zookeeper' flag /// see comments around 'part_committed_locally_but_zookeeper' flag
retries_ctl.actionAfterLastFailedRetry( retries_ctl.actionAfterLastFailedRetry(
[&storage = storage, part_name = part->name]() [&my_storage = storage, part_name = part->name]
{ storage.enqueuePartForCheck(part_name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER); }); {
my_storage.enqueuePartForCheck(part_name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
});
/// We do not know whether or not data has been inserted. /// We do not know whether or not data has been inserted.
retries_ctl.setUserError( retries_ctl.setUserError(

View File

@ -1115,10 +1115,10 @@ void StorageDistributed::read(
} }
additional_shard_filter_generator = additional_shard_filter_generator =
[&, custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) -> ASTPtr [&, my_custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) -> ASTPtr
{ {
return getCustomKeyFilterForParallelReplica( return getCustomKeyFilterForParallelReplica(
shard_count, shard_num - 1, custom_key_ast, settings.parallel_replicas_custom_key_filter_type, *this, local_context); shard_count, shard_num - 1, my_custom_key_ast, settings.parallel_replicas_custom_key_filter_type, *this, local_context);
}; };
} }
} }

View File

@ -9325,9 +9325,9 @@ void StorageReplicatedMergeTree::backupData(
/// This task will be executed after all replicas have collected their parts and the coordination is ready to /// This task will be executed after all replicas have collected their parts and the coordination is ready to
/// give us the final list of parts to add to the BackupEntriesCollector. /// give us the final list of parts to add to the BackupEntriesCollector.
auto post_collecting_task = [shared_id, auto post_collecting_task = [shared_id,
replica_name = getReplicaName(), my_replica_name = getReplicaName(),
coordination, coordination,
backup_entries = std::move(backup_entries), my_backup_entries = std::move(backup_entries),
&backup_entries_collector]() &backup_entries_collector]()
{ {
Strings data_paths = coordination->getReplicatedDataPaths(shared_id); Strings data_paths = coordination->getReplicatedDataPaths(shared_id);
@ -9336,10 +9336,10 @@ void StorageReplicatedMergeTree::backupData(
for (const auto & data_path : data_paths) for (const auto & data_path : data_paths)
data_paths_fs.push_back(data_path); data_paths_fs.push_back(data_path);
Strings part_names = coordination->getReplicatedPartNames(shared_id, replica_name); Strings part_names = coordination->getReplicatedPartNames(shared_id, my_replica_name);
std::unordered_set<std::string_view> part_names_set{part_names.begin(), part_names.end()}; std::unordered_set<std::string_view> part_names_set{part_names.begin(), part_names.end()};
for (const auto & [relative_path, backup_entry] : backup_entries) for (const auto & [relative_path, backup_entry] : my_backup_entries)
{ {
size_t slash_pos = relative_path.find('/'); size_t slash_pos = relative_path.find('/');
String part_name = relative_path.substr(0, slash_pos); String part_name = relative_path.substr(0, slash_pos);
@ -9349,7 +9349,7 @@ void StorageReplicatedMergeTree::backupData(
backup_entries_collector.addBackupEntry(data_path / relative_path, backup_entry); backup_entries_collector.addBackupEntry(data_path / relative_path, backup_entry);
} }
auto mutation_infos = coordination->getReplicatedMutations(shared_id, replica_name); auto mutation_infos = coordination->getReplicatedMutations(shared_id, my_replica_name);
for (const auto & mutation_info : mutation_infos) for (const auto & mutation_info : mutation_infos)
{ {
auto backup_entry = ReplicatedMergeTreeMutationEntry::parse(mutation_info.entry, mutation_info.id).backup(); auto backup_entry = ReplicatedMergeTreeMutationEntry::parse(mutation_info.entry, mutation_info.id).backup();

View File

@ -175,12 +175,12 @@ Pipe StorageSystemReplicas::read(
for (size_t i = 0; i < tables_size; ++i) for (size_t i = 0; i < tables_size; ++i)
{ {
thread_pool.scheduleOrThrowOnError([&, i=i] thread_pool.scheduleOrThrowOnError([&, my_i = i]
{ {
dynamic_cast<StorageReplicatedMergeTree &>( dynamic_cast<StorageReplicatedMergeTree &>(
*replicated_tables *replicated_tables
[(*col_database)[i].safeGet<const String &>()] [(*col_database)[my_i].safeGet<const String &>()]
[(*col_table)[i].safeGet<const String &>()]).getStatus(statuses[i], with_zk_fields); [(*col_table)[my_i].safeGet<const String &>()]).getStatus(statuses[my_i], with_zk_fields);
}); });
} }