mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
commit
caee95b89b
@ -175,7 +175,7 @@ public:
|
||||
Coordination::Stat stat{};
|
||||
String _some_data;
|
||||
auto watch_callback =
|
||||
[stale = stale] (const Coordination::WatchResponse & rsp)
|
||||
[my_stale = stale] (const Coordination::WatchResponse & rsp)
|
||||
{
|
||||
auto logger = &Poco::Logger::get("ClusterCopier");
|
||||
if (rsp.error == Coordination::Error::ZOK)
|
||||
@ -184,11 +184,11 @@ public:
|
||||
{
|
||||
case Coordination::CREATED:
|
||||
LOG_DEBUG(logger, "CleanStateClock change: CREATED, at {}", rsp.path);
|
||||
stale->store(true);
|
||||
my_stale->store(true);
|
||||
break;
|
||||
case Coordination::CHANGED:
|
||||
LOG_DEBUG(logger, "CleanStateClock change: CHANGED, at {}", rsp.path);
|
||||
stale->store(true);
|
||||
my_stale->store(true);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -498,18 +498,18 @@ try
|
||||
|
||||
/// Prometheus (if defined and not setup yet with http_port)
|
||||
port_name = "prometheus.port";
|
||||
createServer(listen_host, port_name, listen_try, [&, http_context = std::move(http_context)](UInt16 port) mutable
|
||||
createServer(listen_host, port_name, listen_try, [&, my_http_context = std::move(http_context)](UInt16 port) mutable
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(socket, listen_host, port);
|
||||
socket.setReceiveTimeout(http_context->getReceiveTimeout());
|
||||
socket.setSendTimeout(http_context->getSendTimeout());
|
||||
socket.setReceiveTimeout(my_http_context->getReceiveTimeout());
|
||||
socket.setSendTimeout(my_http_context->getSendTimeout());
|
||||
servers->emplace_back(
|
||||
listen_host,
|
||||
port_name,
|
||||
"Prometheus: http://" + address.toString(),
|
||||
std::make_unique<HTTPServer>(
|
||||
std::move(http_context), createPrometheusMainHandlerFactory(*this, config_getter(), async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
|
||||
std::move(my_http_context), createPrometheusMainHandlerFactory(*this, config_getter(), async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -47,10 +47,10 @@ scope_guard AccessChangesNotifier::subscribeForChanges(AccessEntityType type, co
|
||||
list.push_back(handler);
|
||||
auto handler_it = std::prev(list.end());
|
||||
|
||||
return [handlers=handlers, type, handler_it]
|
||||
return [my_handlers = handlers, type, handler_it]
|
||||
{
|
||||
std::lock_guard lock2{handlers->mutex};
|
||||
auto & list2 = handlers->by_type[static_cast<size_t>(type)];
|
||||
std::lock_guard lock2{my_handlers->mutex};
|
||||
auto & list2 = my_handlers->by_type[static_cast<size_t>(type)];
|
||||
list2.erase(handler_it);
|
||||
};
|
||||
}
|
||||
@ -63,13 +63,13 @@ scope_guard AccessChangesNotifier::subscribeForChanges(const UUID & id, const On
|
||||
list.push_back(handler);
|
||||
auto handler_it = std::prev(list.end());
|
||||
|
||||
return [handlers=handlers, it, handler_it]
|
||||
return [my_handlers = handlers, it, handler_it]
|
||||
{
|
||||
std::lock_guard lock2{handlers->mutex};
|
||||
std::lock_guard lock2{my_handlers->mutex};
|
||||
auto & list2 = it->second;
|
||||
list2.erase(handler_it);
|
||||
if (list2.empty())
|
||||
handlers->by_id.erase(it);
|
||||
my_handlers->by_id.erase(it);
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -742,9 +742,9 @@ void DiskAccessStorage::restoreFromBackup(RestorerFromBackup & restorer)
|
||||
bool replace_if_exists = (create_access == RestoreAccessCreationMode::kReplace);
|
||||
bool throw_if_exists = (create_access == RestoreAccessCreationMode::kCreate);
|
||||
|
||||
restorer.addDataRestoreTask([this, entities = std::move(entities), replace_if_exists, throw_if_exists]
|
||||
restorer.addDataRestoreTask([this, my_entities = std::move(entities), replace_if_exists, throw_if_exists]
|
||||
{
|
||||
for (const auto & [id, entity] : entities)
|
||||
for (const auto & [id, entity] : my_entities)
|
||||
insertWithID(id, entity, replace_if_exists, throw_if_exists, /* write_on_disk= */ true);
|
||||
});
|
||||
}
|
||||
|
@ -26,10 +26,10 @@ scope_guard EnabledRoles::subscribeForChanges(const OnChangeHandler & handler) c
|
||||
handlers->list.push_back(handler);
|
||||
auto it = std::prev(handlers->list.end());
|
||||
|
||||
return [handlers=handlers, it]
|
||||
return [my_handlers = handlers, it]
|
||||
{
|
||||
std::lock_guard lock2{handlers->mutex};
|
||||
handlers->list.erase(it);
|
||||
std::lock_guard lock2{my_handlers->mutex};
|
||||
my_handlers->list.erase(it);
|
||||
};
|
||||
}
|
||||
|
||||
@ -53,10 +53,10 @@ void EnabledRoles::setRolesInfo(const std::shared_ptr<const EnabledRolesInfo> &
|
||||
}
|
||||
|
||||
notifications->join(scope_guard(
|
||||
[info = info, handlers_to_notify = std::move(handlers_to_notify)]
|
||||
[my_info = info, my_handlers_to_notify = std::move(handlers_to_notify)]
|
||||
{
|
||||
for (const auto & handler : handlers_to_notify)
|
||||
handler(info);
|
||||
for (const auto & handler : my_handlers_to_notify)
|
||||
handler(my_info);
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
@ -297,9 +297,9 @@ void MemoryAccessStorage::restoreFromBackup(RestorerFromBackup & restorer)
|
||||
bool replace_if_exists = (create_access == RestoreAccessCreationMode::kReplace);
|
||||
bool throw_if_exists = (create_access == RestoreAccessCreationMode::kCreate);
|
||||
|
||||
restorer.addDataRestoreTask([this, entities = std::move(entities), replace_if_exists, throw_if_exists]
|
||||
restorer.addDataRestoreTask([this, my_entities = std::move(entities), replace_if_exists, throw_if_exists]
|
||||
{
|
||||
for (const auto & [id, entity] : entities)
|
||||
for (const auto & [id, entity] : my_entities)
|
||||
insertWithID(id, entity, replace_if_exists, throw_if_exists);
|
||||
});
|
||||
}
|
||||
|
@ -525,9 +525,9 @@ void ReplicatedAccessStorage::refreshEntities(const zkutil::ZooKeeperPtr & zooke
|
||||
}
|
||||
|
||||
const String zookeeper_uuids_path = zookeeper_path + "/uuid";
|
||||
auto watch_entities_list = [watched_queue = watched_queue](const Coordination::WatchResponse &)
|
||||
auto watch_entities_list = [my_watched_queue = watched_queue](const Coordination::WatchResponse &)
|
||||
{
|
||||
[[maybe_unused]] bool push_result = watched_queue->push(UUIDHelpers::Nil);
|
||||
[[maybe_unused]] bool push_result = my_watched_queue->push(UUIDHelpers::Nil);
|
||||
};
|
||||
Coordination::Stat stat;
|
||||
const auto entity_uuid_strs = zookeeper->getChildrenWatch(zookeeper_uuids_path, &stat, watch_entities_list);
|
||||
@ -592,10 +592,10 @@ void ReplicatedAccessStorage::refreshEntityNoLock(const zkutil::ZooKeeperPtr & z
|
||||
|
||||
AccessEntityPtr ReplicatedAccessStorage::tryReadEntityFromZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id) const
|
||||
{
|
||||
const auto watch_entity = [watched_queue = watched_queue, id](const Coordination::WatchResponse & response)
|
||||
const auto watch_entity = [my_watched_queue = watched_queue, id](const Coordination::WatchResponse & response)
|
||||
{
|
||||
if (response.type == Coordination::Event::CHANGED)
|
||||
[[maybe_unused]] bool push_result = watched_queue->push(id);
|
||||
[[maybe_unused]] bool push_result = my_watched_queue->push(id);
|
||||
};
|
||||
|
||||
Coordination::Stat entity_stat;
|
||||
@ -680,12 +680,12 @@ void ReplicatedAccessStorage::backup(BackupEntriesCollector & backup_entries_col
|
||||
|
||||
backup_entries_collector.addPostTask(
|
||||
[backup_entry = backup_entry_with_path.second,
|
||||
zookeeper_path = zookeeper_path,
|
||||
my_zookeeper_path = zookeeper_path,
|
||||
type,
|
||||
&backup_entries_collector,
|
||||
backup_coordination]
|
||||
{
|
||||
for (const String & path : backup_coordination->getReplicatedAccessFilePaths(zookeeper_path, type))
|
||||
for (const String & path : backup_coordination->getReplicatedAccessFilePaths(my_zookeeper_path, type))
|
||||
backup_entries_collector.addBackupEntry(path, backup_entry);
|
||||
});
|
||||
}
|
||||
@ -708,9 +708,9 @@ void ReplicatedAccessStorage::restoreFromBackup(RestorerFromBackup & restorer)
|
||||
bool replace_if_exists = (create_access == RestoreAccessCreationMode::kReplace);
|
||||
bool throw_if_exists = (create_access == RestoreAccessCreationMode::kCreate);
|
||||
|
||||
restorer.addDataRestoreTask([this, entities = std::move(entities), replace_if_exists, throw_if_exists]
|
||||
restorer.addDataRestoreTask([this, my_entities = std::move(entities), replace_if_exists, throw_if_exists]
|
||||
{
|
||||
for (const auto & [id, entity] : entities)
|
||||
for (const auto & [id, entity] : my_entities)
|
||||
insertWithID(id, entity, replace_if_exists, throw_if_exists);
|
||||
});
|
||||
}
|
||||
|
@ -173,13 +173,13 @@ BackupCoordinationRemote::BackupCoordinationRemote(
|
||||
log,
|
||||
get_zookeeper_,
|
||||
keeper_settings,
|
||||
[zookeeper_path = zookeeper_path, current_host = current_host, is_internal = is_internal]
|
||||
[my_zookeeper_path = zookeeper_path, my_current_host = current_host, my_is_internal = is_internal]
|
||||
(WithRetries::FaultyKeeper & zk)
|
||||
{
|
||||
/// Recreate this ephemeral node to signal that we are alive.
|
||||
if (is_internal)
|
||||
if (my_is_internal)
|
||||
{
|
||||
String alive_node_path = zookeeper_path + "/stage/alive|" + current_host;
|
||||
String alive_node_path = my_zookeeper_path + "/stage/alive|" + my_current_host;
|
||||
auto code = zk->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
|
||||
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
|
@ -470,17 +470,17 @@ std::vector<std::pair<ASTPtr, StoragePtr>> BackupEntriesCollector::findTablesInD
|
||||
const auto & database_info = database_infos.at(database_name);
|
||||
const auto & database = database_info.database;
|
||||
|
||||
auto filter_by_table_name = [database_info = &database_info](const String & table_name)
|
||||
auto filter_by_table_name = [my_database_info = &database_info](const String & table_name)
|
||||
{
|
||||
/// We skip inner tables of materialized views.
|
||||
if (table_name.starts_with(".inner_id."))
|
||||
return false;
|
||||
|
||||
if (database_info->tables.contains(table_name))
|
||||
if (my_database_info->tables.contains(table_name))
|
||||
return true;
|
||||
|
||||
if (database_info->all_tables)
|
||||
return !database_info->except_table_names.contains(table_name);
|
||||
if (my_database_info->all_tables)
|
||||
return !my_database_info->except_table_names.contains(table_name);
|
||||
|
||||
return false;
|
||||
};
|
||||
|
@ -208,7 +208,7 @@ void BackupImpl::openArchive()
|
||||
if (!reader->fileExists(archive_name))
|
||||
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", backup_name_for_logging);
|
||||
size_t archive_size = reader->getFileSize(archive_name);
|
||||
archive_reader = createArchiveReader(archive_name, [reader=reader, archive_name]{ return reader->readFile(archive_name); }, archive_size);
|
||||
archive_reader = createArchiveReader(archive_name, [my_reader = reader, archive_name]{ return my_reader->readFile(archive_name); }, archive_size);
|
||||
archive_reader->setPassword(archive_params.password);
|
||||
}
|
||||
else
|
||||
|
@ -34,13 +34,13 @@ RestoreCoordinationRemote::RestoreCoordinationRemote(
|
||||
log,
|
||||
get_zookeeper_,
|
||||
keeper_settings,
|
||||
[zookeeper_path = zookeeper_path, current_host = current_host, is_internal = is_internal]
|
||||
[my_zookeeper_path = zookeeper_path, my_current_host = current_host, my_is_internal = is_internal]
|
||||
(WithRetries::FaultyKeeper & zk)
|
||||
{
|
||||
/// Recreate this ephemeral node to signal that we are alive.
|
||||
if (is_internal)
|
||||
if (my_is_internal)
|
||||
{
|
||||
String alive_node_path = zookeeper_path + "/stage/alive|" + current_host;
|
||||
String alive_node_path = my_zookeeper_path + "/stage/alive|" + my_current_host;
|
||||
auto code = zk->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
|
||||
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
|
@ -108,14 +108,14 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti
|
||||
template <typename ConnectionType>
|
||||
void Suggest::load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit)
|
||||
{
|
||||
loading_thread = std::thread([context=Context::createCopy(context), connection_parameters, suggestion_limit, this]
|
||||
loading_thread = std::thread([my_context = Context::createCopy(context), connection_parameters, suggestion_limit, this]
|
||||
{
|
||||
ThreadStatus thread_status;
|
||||
for (size_t retry = 0; retry < 10; ++retry)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto connection = ConnectionType::createConnection(connection_parameters, context);
|
||||
auto connection = ConnectionType::createConnection(connection_parameters, my_context);
|
||||
fetch(*connection, connection_parameters.timeouts, getLoadSuggestionQuery(suggestion_limit, std::is_same_v<ConnectionType, LocalConnection>));
|
||||
}
|
||||
catch (const Exception & e)
|
||||
|
@ -942,9 +942,9 @@ ColumnPtr ColumnArray::compress() const
|
||||
size_t byte_size = data_compressed->byteSize() + offsets_compressed->byteSize();
|
||||
|
||||
return ColumnCompressed::create(size(), byte_size,
|
||||
[data_compressed = std::move(data_compressed), offsets_compressed = std::move(offsets_compressed)]
|
||||
[my_data_compressed = std::move(data_compressed), my_offsets_compressed = std::move(offsets_compressed)]
|
||||
{
|
||||
return ColumnArray::create(data_compressed->decompress(), offsets_compressed->decompress());
|
||||
return ColumnArray::create(my_data_compressed->decompress(), my_offsets_compressed->decompress());
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,7 @@ public:
|
||||
return ColumnCompressed::create(
|
||||
size,
|
||||
bytes,
|
||||
[column = std::move(column)]{ return column; });
|
||||
[my_column = std::move(column)]{ return my_column; });
|
||||
}
|
||||
|
||||
/// Helper methods for compression.
|
||||
|
@ -386,11 +386,11 @@ ColumnPtr ColumnDecimal<T>::compress() const
|
||||
|
||||
const size_t compressed_size = compressed->size();
|
||||
return ColumnCompressed::create(data_size, compressed_size,
|
||||
[compressed = std::move(compressed), column_size = data_size, scale = this->scale]
|
||||
[my_compressed = std::move(compressed), column_size = data_size, my_scale = this->scale]
|
||||
{
|
||||
auto res = ColumnDecimal<T>::create(column_size, scale);
|
||||
auto res = ColumnDecimal<T>::create(column_size, my_scale);
|
||||
ColumnCompressed::decompressBuffer(
|
||||
compressed->data(), res->getData().data(), compressed->size(), column_size * sizeof(T));
|
||||
my_compressed->data(), res->getData().data(), my_compressed->size(), column_size * sizeof(T));
|
||||
return res;
|
||||
});
|
||||
}
|
||||
|
@ -393,13 +393,13 @@ ColumnPtr ColumnFixedString::compress() const
|
||||
const size_t column_size = size();
|
||||
const size_t compressed_size = compressed->size();
|
||||
return ColumnCompressed::create(column_size, compressed_size,
|
||||
[compressed = std::move(compressed), column_size, n = n]
|
||||
[my_compressed = std::move(compressed), column_size, my_n = n]
|
||||
{
|
||||
size_t chars_size = n * column_size;
|
||||
auto res = ColumnFixedString::create(n);
|
||||
size_t chars_size = my_n * column_size;
|
||||
auto res = ColumnFixedString::create(my_n);
|
||||
res->getChars().resize(chars_size);
|
||||
ColumnCompressed::decompressBuffer(
|
||||
compressed->data(), res->getChars().data(), compressed->size(), chars_size);
|
||||
my_compressed->data(), res->getChars().data(), my_compressed->size(), chars_size);
|
||||
return res;
|
||||
});
|
||||
}
|
||||
|
@ -312,9 +312,9 @@ ColumnPtr ColumnMap::compress() const
|
||||
const auto byte_size = compressed->byteSize();
|
||||
/// The order of evaluation of function arguments is unspecified
|
||||
/// and could cause interacting with object in moved-from state
|
||||
return ColumnCompressed::create(size(), byte_size, [compressed = std::move(compressed)]
|
||||
return ColumnCompressed::create(size(), byte_size, [my_compressed = std::move(compressed)]
|
||||
{
|
||||
return ColumnMap::create(compressed->decompress());
|
||||
return ColumnMap::create(my_compressed->decompress());
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -644,9 +644,9 @@ ColumnPtr ColumnNullable::compress() const
|
||||
size_t byte_size = nested_column->byteSize() + null_map->byteSize();
|
||||
|
||||
return ColumnCompressed::create(size(), byte_size,
|
||||
[nested_column = std::move(nested_compressed), null_map = std::move(null_map_compressed)]
|
||||
[my_nested_column = std::move(nested_compressed), my_null_map = std::move(null_map_compressed)]
|
||||
{
|
||||
return ColumnNullable::create(nested_column->decompress(), null_map->decompress());
|
||||
return ColumnNullable::create(my_nested_column->decompress(), my_null_map->decompress());
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -738,9 +738,9 @@ ColumnPtr ColumnSparse::compress() const
|
||||
size_t byte_size = values_compressed->byteSize() + offsets_compressed->byteSize();
|
||||
|
||||
return ColumnCompressed::create(size(), byte_size,
|
||||
[values_compressed = std::move(values_compressed), offsets_compressed = std::move(offsets_compressed), size = size()]
|
||||
[my_values_compressed = std::move(values_compressed), my_offsets_compressed = std::move(offsets_compressed), size = size()]
|
||||
{
|
||||
return ColumnSparse::create(values_compressed->decompress(), offsets_compressed->decompress(), size);
|
||||
return ColumnSparse::create(my_values_compressed->decompress(), my_offsets_compressed->decompress(), size);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -532,8 +532,8 @@ ColumnPtr ColumnString::compress() const
|
||||
const size_t offsets_compressed_size = offsets_compressed->size();
|
||||
return ColumnCompressed::create(source_offsets_elements, chars_compressed_size + offsets_compressed_size,
|
||||
[
|
||||
chars_compressed = std::move(chars_compressed),
|
||||
offsets_compressed = std::move(offsets_compressed),
|
||||
my_chars_compressed = std::move(chars_compressed),
|
||||
my_offsets_compressed = std::move(offsets_compressed),
|
||||
source_chars_size,
|
||||
source_offsets_elements
|
||||
]
|
||||
@ -544,10 +544,10 @@ ColumnPtr ColumnString::compress() const
|
||||
res->getOffsets().resize(source_offsets_elements);
|
||||
|
||||
ColumnCompressed::decompressBuffer(
|
||||
chars_compressed->data(), res->getChars().data(), chars_compressed->size(), source_chars_size);
|
||||
my_chars_compressed->data(), res->getChars().data(), my_chars_compressed->size(), source_chars_size);
|
||||
|
||||
ColumnCompressed::decompressBuffer(
|
||||
offsets_compressed->data(), res->getOffsets().data(), offsets_compressed->size(), source_offsets_elements * sizeof(Offset));
|
||||
my_offsets_compressed->data(), res->getOffsets().data(), my_offsets_compressed->size(), source_offsets_elements * sizeof(Offset));
|
||||
|
||||
return res;
|
||||
});
|
||||
|
@ -552,11 +552,11 @@ ColumnPtr ColumnTuple::compress() const
|
||||
}
|
||||
|
||||
return ColumnCompressed::create(size(), byte_size,
|
||||
[compressed = std::move(compressed)]() mutable
|
||||
[my_compressed = std::move(compressed)]() mutable
|
||||
{
|
||||
for (auto & column : compressed)
|
||||
for (auto & column : my_compressed)
|
||||
column = column->decompress();
|
||||
return ColumnTuple::create(compressed);
|
||||
return ColumnTuple::create(my_compressed);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -927,11 +927,11 @@ ColumnPtr ColumnVector<T>::compress() const
|
||||
|
||||
const size_t compressed_size = compressed->size();
|
||||
return ColumnCompressed::create(data_size, compressed_size,
|
||||
[compressed = std::move(compressed), column_size = data_size]
|
||||
[my_compressed = std::move(compressed), column_size = data_size]
|
||||
{
|
||||
auto res = ColumnVector<T>::create(column_size);
|
||||
ColumnCompressed::decompressBuffer(
|
||||
compressed->data(), res->getData().data(), compressed->size(), column_size * sizeof(T));
|
||||
my_compressed->data(), res->getData().data(), my_compressed->size(), column_size * sizeof(T));
|
||||
return res;
|
||||
});
|
||||
}
|
||||
|
@ -206,21 +206,21 @@ public:
|
||||
/// - If this will throw an exception, the destructor won't be called
|
||||
/// - this pointer cannot be passed in the lambda, since after detach() it will not be valid
|
||||
GlobalThreadPool::instance().scheduleOrThrow([
|
||||
state = state,
|
||||
func = std::forward<Function>(func),
|
||||
args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture
|
||||
my_state = state,
|
||||
my_func = std::forward<Function>(func),
|
||||
my_args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture
|
||||
{
|
||||
SCOPE_EXIT(
|
||||
state->thread_id = std::thread::id();
|
||||
state->event.set();
|
||||
my_state->thread_id = std::thread::id();
|
||||
my_state->event.set();
|
||||
);
|
||||
|
||||
state->thread_id = std::this_thread::get_id();
|
||||
my_state->thread_id = std::this_thread::get_id();
|
||||
|
||||
/// This moves are needed to destroy function and arguments before exit.
|
||||
/// It will guarantee that after ThreadFromGlobalPool::join all captured params are destroyed.
|
||||
auto function = std::move(func);
|
||||
auto arguments = std::move(args);
|
||||
auto function = std::move(my_func);
|
||||
auto arguments = std::move(my_args);
|
||||
|
||||
/// Thread status holds raw pointer on query context, thus it always must be destroyed
|
||||
/// before sending signal that permits to join this thread.
|
||||
|
@ -234,12 +234,12 @@ TEST(ConcurrencyControl, MultipleThreads)
|
||||
while (auto slot = slots->tryAcquire())
|
||||
{
|
||||
std::unique_lock lock{threads_mutex};
|
||||
threads.emplace_back([&, slot = std::move(slot)]
|
||||
threads.emplace_back([&, my_slot = std::move(slot)]
|
||||
{
|
||||
pcg64 rng(randomSeed());
|
||||
std::uniform_int_distribution<size_t> distribution(1, cfg_work_us);
|
||||
size_t steps = distribution(rng);
|
||||
for (size_t step = 0; step < steps; step++)
|
||||
for (size_t step = 0; step < steps; ++step)
|
||||
{
|
||||
sleepForMicroseconds(distribution(rng)); // emulate work
|
||||
spawn_threads(); // upscale
|
||||
|
@ -471,9 +471,9 @@ void KeeperDispatcher::shutdown()
|
||||
const auto raft_result = server->putRequestBatch(close_requests);
|
||||
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
|
||||
auto sessions_closing_done = sessions_closing_done_promise->get_future();
|
||||
raft_result->when_ready([sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
|
||||
raft_result->when_ready([my_sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
|
||||
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
|
||||
nuraft::ptr<std::exception> & /*exception*/) { sessions_closing_done_promise->set_value(); });
|
||||
nuraft::ptr<std::exception> & /*exception*/) { my_sessions_closing_done_promise->set_value(); });
|
||||
|
||||
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
|
||||
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)
|
||||
|
@ -342,34 +342,34 @@ void KeeperStorage::UncommittedState::applyDelta(const Delta & delta)
|
||||
auto & [node, acls, last_applied_zxid] = nodes.at(delta.path);
|
||||
|
||||
std::visit(
|
||||
[&, &node = node, &acls = acls, &last_applied_zxid = last_applied_zxid]<typename DeltaType>(const DeltaType & operation)
|
||||
[&, &my_node = node, &my_acls = acls, &my_last_applied_zxid = last_applied_zxid]<typename DeltaType>(const DeltaType & operation)
|
||||
{
|
||||
if constexpr (std::same_as<DeltaType, CreateNodeDelta>)
|
||||
{
|
||||
assert(!node);
|
||||
node = std::make_shared<Node>();
|
||||
node->stat = operation.stat;
|
||||
node->setData(operation.data);
|
||||
acls = operation.acls;
|
||||
last_applied_zxid = delta.zxid;
|
||||
assert(!my_node);
|
||||
my_node = std::make_shared<Node>();
|
||||
my_node->stat = operation.stat;
|
||||
my_node->setData(operation.data);
|
||||
my_acls = operation.acls;
|
||||
my_last_applied_zxid = delta.zxid;
|
||||
}
|
||||
else if constexpr (std::same_as<DeltaType, RemoveNodeDelta>)
|
||||
{
|
||||
assert(node);
|
||||
node = nullptr;
|
||||
last_applied_zxid = delta.zxid;
|
||||
assert(my_node);
|
||||
my_node = nullptr;
|
||||
my_last_applied_zxid = delta.zxid;
|
||||
}
|
||||
else if constexpr (std::same_as<DeltaType, UpdateNodeDelta>)
|
||||
{
|
||||
assert(node);
|
||||
node->invalidateDigestCache();
|
||||
assert(my_node);
|
||||
my_node->invalidateDigestCache();
|
||||
operation.update_fn(*node);
|
||||
last_applied_zxid = delta.zxid;
|
||||
my_last_applied_zxid = delta.zxid;
|
||||
}
|
||||
else if constexpr (std::same_as<DeltaType, SetACLDelta>)
|
||||
{
|
||||
acls = operation.acls;
|
||||
last_applied_zxid = delta.zxid;
|
||||
my_acls = operation.acls;
|
||||
my_last_applied_zxid = delta.zxid;
|
||||
}
|
||||
},
|
||||
delta.operation);
|
||||
|
@ -140,7 +140,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
|
||||
if (itr.key != "/")
|
||||
{
|
||||
auto parent_path = parentPath(itr.key);
|
||||
storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(path)); value.stat.numChildren++; });
|
||||
storage.container.updateValue(parent_path, [my_path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(my_path)); ++value.stat.numChildren; });
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,10 +80,10 @@ struct DictionaryAttributeType
|
||||
template <typename F>
|
||||
constexpr void callOnDictionaryAttributeType(AttributeUnderlyingType type, F && func)
|
||||
{
|
||||
static_for<AttributeUnderlyingType>([type, func = std::forward<F>(func)](auto other)
|
||||
static_for<AttributeUnderlyingType>([type, my_func = std::forward<F>(func)](auto other)
|
||||
{
|
||||
if (type == other)
|
||||
func(DictionaryAttributeType<other>{});
|
||||
my_func(DictionaryAttributeType<other>{});
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -3005,10 +3005,10 @@ namespace
|
||||
bool google_wrappers_special_treatment)
|
||||
{
|
||||
root_serializer_ptr = std::make_shared<ProtobufSerializer *>();
|
||||
get_root_desc_function = [root_serializer_ptr = root_serializer_ptr](size_t indent) -> String
|
||||
get_root_desc_function = [my_root_serializer_ptr = root_serializer_ptr](size_t indent) -> String
|
||||
{
|
||||
WriteBufferFromOwnString buf;
|
||||
(*root_serializer_ptr)->describeTree(buf, indent);
|
||||
(*my_root_serializer_ptr)->describeTree(buf, indent);
|
||||
return buf.str();
|
||||
};
|
||||
|
||||
|
@ -56,18 +56,18 @@ void backupUserDefinedSQLObjects(
|
||||
// They will only be returned for one of the hosts below, for the rest an empty list.
|
||||
// See also BackupCoordinationReplicatedSQLObjects class.
|
||||
backup_entries_collector.addPostTask(
|
||||
[backup_entries = std::move(backup_entries),
|
||||
replication_id = std::move(replication_id),
|
||||
[my_backup_entries = std::move(backup_entries),
|
||||
my_replication_id = std::move(replication_id),
|
||||
object_type,
|
||||
&backup_entries_collector,
|
||||
backup_coordination]
|
||||
{
|
||||
auto dirs = backup_coordination->getReplicatedSQLObjectsDirs(replication_id, object_type);
|
||||
auto dirs = backup_coordination->getReplicatedSQLObjectsDirs(my_replication_id, object_type);
|
||||
|
||||
for (const auto & dir : dirs)
|
||||
{
|
||||
fs::path dir_fs{dir};
|
||||
for (const auto & [file_name, entry] : backup_entries)
|
||||
for (const auto & [file_name, entry] : my_backup_entries)
|
||||
{
|
||||
backup_entries_collector.addBackupEntry(dir_fs / file_name, entry);
|
||||
}
|
||||
|
@ -283,11 +283,11 @@ bool UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectDataAndSetWatch(
|
||||
UserDefinedSQLObjectType object_type,
|
||||
const String & object_name)
|
||||
{
|
||||
const auto object_watcher = [watch_queue = watch_queue, object_type, object_name](const Coordination::WatchResponse & response)
|
||||
const auto object_watcher = [my_watch_queue = watch_queue, object_type, object_name](const Coordination::WatchResponse & response)
|
||||
{
|
||||
if (response.type == Coordination::Event::CHANGED)
|
||||
{
|
||||
[[maybe_unused]] bool inserted = watch_queue->emplace(object_type, object_name);
|
||||
[[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, object_name);
|
||||
/// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called).
|
||||
}
|
||||
/// Event::DELETED is processed as child event by getChildren watch
|
||||
@ -346,9 +346,9 @@ ASTPtr UserDefinedSQLObjectsLoaderFromZooKeeper::tryLoadObject(
|
||||
Strings UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectNamesAndSetWatch(
|
||||
const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
|
||||
{
|
||||
auto object_list_watcher = [watch_queue = watch_queue, object_type](const Coordination::WatchResponse &)
|
||||
auto object_list_watcher = [my_watch_queue = watch_queue, object_type](const Coordination::WatchResponse &)
|
||||
{
|
||||
[[maybe_unused]] bool inserted = watch_queue->emplace(object_type, "");
|
||||
[[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, "");
|
||||
/// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called).
|
||||
};
|
||||
|
||||
|
@ -87,7 +87,7 @@ bool ParallelReadBuffer::addReaderToPool()
|
||||
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader), range_start, size));
|
||||
|
||||
++active_working_reader;
|
||||
schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }, 0);
|
||||
schedule([this, my_worker = std::move(worker)]() mutable { readerThreadFunction(std::move(my_worker)); }, 0);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -190,9 +190,9 @@ void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key,
|
||||
{
|
||||
/// Wrap 'unique_ptr' with 'shared_ptr' to make this
|
||||
/// lambda copyable and allow to save it to the thread pool.
|
||||
pool.scheduleOrThrowOnError([key, global_context, data = std::make_shared<InsertDataPtr>(std::move(data))]() mutable
|
||||
pool.scheduleOrThrowOnError([key, global_context, my_data = std::make_shared<InsertDataPtr>(std::move(data))]() mutable
|
||||
{
|
||||
processData(key, std::move(*data), std::move(global_context));
|
||||
processData(key, std::move(*my_data), std::move(global_context));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -149,7 +149,7 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk,
|
||||
int * version,
|
||||
bool set_callback)
|
||||
{
|
||||
auto watch_callback = [cluster_name, clusters_to_update=clusters_to_update](auto) { clusters_to_update->set(cluster_name); };
|
||||
auto watch_callback = [cluster_name, my_clusters_to_update = clusters_to_update](auto) { my_clusters_to_update->set(cluster_name); };
|
||||
|
||||
Coordination::Stat stat;
|
||||
Strings nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? watch_callback : Coordination::WatchCallback{});
|
||||
|
@ -922,7 +922,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
auto finish_callback = [elem,
|
||||
context,
|
||||
ast,
|
||||
can_use_query_cache = can_use_query_cache,
|
||||
my_can_use_query_cache = can_use_query_cache,
|
||||
enable_writes_to_query_cache = settings.enable_writes_to_query_cache,
|
||||
query_cache_store_results_of_queries_with_nondeterministic_functions = settings.query_cache_store_results_of_queries_with_nondeterministic_functions,
|
||||
log_queries,
|
||||
@ -940,7 +940,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
auto query_cache = context->getQueryCache();
|
||||
if (query_cache != nullptr
|
||||
&& pulling_pipeline
|
||||
&& can_use_query_cache && enable_writes_to_query_cache
|
||||
&& my_can_use_query_cache && enable_writes_to_query_cache
|
||||
&& (!astContainsNonDeterministicFunctions(ast, context) || query_cache_store_results_of_queries_with_nondeterministic_functions))
|
||||
{
|
||||
query_pipeline.finalizeWriteInQueryCache();
|
||||
@ -1071,7 +1071,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
log_queries,
|
||||
log_queries_min_type = settings.log_queries_min_type,
|
||||
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
|
||||
quota(quota),
|
||||
my_quota(quota),
|
||||
status_info_to_query_log,
|
||||
implicit_txn_control,
|
||||
execute_implicit_tcl_query,
|
||||
@ -1082,8 +1082,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
else if (auto txn = context->getCurrentTransaction())
|
||||
txn->onException();
|
||||
|
||||
if (quota)
|
||||
quota->used(QuotaType::ERRORS, 1, /* check_exceeded = */ false);
|
||||
if (my_quota)
|
||||
my_quota->used(QuotaType::ERRORS, 1, /* check_exceeded = */ false);
|
||||
|
||||
elem.type = QueryLogElementType::EXCEPTION_WHILE_PROCESSING;
|
||||
elem.exception_code = getCurrentExceptionCode();
|
||||
|
@ -17,9 +17,9 @@ using ThreadPoolCallbackRunner = std::function<std::future<Result>(Callback &&,
|
||||
template <typename Result, typename Callback = std::function<Result()>>
|
||||
ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name)
|
||||
{
|
||||
return [pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](Callback && callback, int64_t priority) mutable -> std::future<Result>
|
||||
return [my_pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](Callback && callback, int64_t priority) mutable -> std::future<Result>
|
||||
{
|
||||
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, callback = std::move(callback)]() mutable -> Result
|
||||
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, my_callback = std::move(callback)]() mutable -> Result
|
||||
{
|
||||
if (thread_group)
|
||||
CurrentThread::attachToGroup(thread_group);
|
||||
@ -29,7 +29,7 @@ ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool &
|
||||
/// Release all captutred resources before detaching thread group
|
||||
/// Releasing has to use proper memory tracker which has been set here before callback
|
||||
|
||||
[[maybe_unused]] auto tmp = std::move(callback);
|
||||
[[maybe_unused]] auto tmp = std::move(my_callback);
|
||||
}
|
||||
|
||||
if (thread_group)
|
||||
@ -39,13 +39,13 @@ ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool &
|
||||
|
||||
setThreadName(thread_name.data());
|
||||
|
||||
return callback();
|
||||
return my_callback();
|
||||
});
|
||||
|
||||
auto future = task->get_future();
|
||||
|
||||
/// ThreadPool is using "bigger is higher priority" instead of "smaller is more priority".
|
||||
pool->scheduleOrThrow([task = std::move(task)]{ (*task)(); }, -priority);
|
||||
my_pool->scheduleOrThrow([my_task = std::move(task)]{ (*my_task)(); }, -priority);
|
||||
|
||||
return future;
|
||||
};
|
||||
|
@ -327,7 +327,7 @@ void PipelineExecutor::spawnThreads()
|
||||
tasks.upscale(thread_num + 1);
|
||||
|
||||
/// Start new thread
|
||||
pool->scheduleOrThrowOnError([this, thread_num, thread_group = CurrentThread::getGroup(), slot = std::move(slot)]
|
||||
pool->scheduleOrThrowOnError([this, thread_num, thread_group = CurrentThread::getGroup(), my_slot = std::move(slot)]
|
||||
{
|
||||
SCOPE_EXIT_SAFE(
|
||||
if (thread_group)
|
||||
|
@ -669,10 +669,10 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
|
||||
|
||||
/// Let's split ranges to avoid reading much data.
|
||||
auto split_ranges
|
||||
= [rows_granularity = data_settings->index_granularity, max_block_size = max_block_size](const auto & ranges, int direction)
|
||||
= [rows_granularity = data_settings->index_granularity, my_max_block_size = max_block_size](const auto & ranges, int direction)
|
||||
{
|
||||
MarkRanges new_ranges;
|
||||
const size_t max_marks_in_range = (max_block_size + rows_granularity - 1) / rows_granularity;
|
||||
const size_t max_marks_in_range = (my_max_block_size + rows_granularity - 1) / rows_granularity;
|
||||
size_t marks_in_range = 1;
|
||||
|
||||
if (direction == 1)
|
||||
|
@ -146,31 +146,31 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
|
||||
}
|
||||
|
||||
auto lazily_create_stream = [
|
||||
shard = shard, shard_count = shard_count, query = shard.query, header = shard.header,
|
||||
context = context, throttler = throttler,
|
||||
main_table = main_table, table_func_ptr = table_func_ptr,
|
||||
scalars = scalars, external_tables = external_tables,
|
||||
stage = stage, local_delay = shard.local_delay,
|
||||
my_shard = shard, my_shard_count = shard_count, query = shard.query, header = shard.header,
|
||||
my_context = context, my_throttler = throttler,
|
||||
my_main_table = main_table, my_table_func_ptr = table_func_ptr,
|
||||
my_scalars = scalars, my_external_tables = external_tables,
|
||||
my_stage = stage, local_delay = shard.local_delay,
|
||||
add_agg_info, add_totals, add_extremes, async_read, async_query_sending]() mutable
|
||||
-> QueryPipelineBuilder
|
||||
{
|
||||
auto current_settings = context->getSettingsRef();
|
||||
auto current_settings = my_context->getSettingsRef();
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
|
||||
current_settings).getSaturated(
|
||||
current_settings.max_execution_time);
|
||||
std::vector<ConnectionPoolWithFailover::TryResult> try_results;
|
||||
try
|
||||
{
|
||||
if (table_func_ptr)
|
||||
try_results = shard.shard_info.pool->getManyForTableFunction(timeouts, ¤t_settings, PoolMode::GET_MANY);
|
||||
if (my_table_func_ptr)
|
||||
try_results = my_shard.shard_info.pool->getManyForTableFunction(timeouts, ¤t_settings, PoolMode::GET_MANY);
|
||||
else
|
||||
try_results = shard.shard_info.pool->getManyChecked(timeouts, ¤t_settings, PoolMode::GET_MANY, main_table.getQualifiedName());
|
||||
try_results = my_shard.shard_info.pool->getManyChecked(timeouts, ¤t_settings, PoolMode::GET_MANY, my_main_table.getQualifiedName());
|
||||
}
|
||||
catch (const Exception & ex)
|
||||
{
|
||||
if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
|
||||
LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"),
|
||||
"Connections to remote replicas of local shard {} failed, will use stale local replica", shard.shard_info.shard_num);
|
||||
"Connections to remote replicas of local shard {} failed, will use stale local replica", my_shard.shard_info.shard_num);
|
||||
else
|
||||
throw;
|
||||
}
|
||||
@ -185,11 +185,11 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
|
||||
if (try_results.empty() || local_delay < max_remote_delay)
|
||||
{
|
||||
auto plan = createLocalPlan(
|
||||
query, header, context, stage, shard.shard_info.shard_num, shard_count, 0, 0, /*coordinator=*/nullptr);
|
||||
query, header, my_context, my_stage, my_shard.shard_info.shard_num, my_shard_count, 0, 0, /*coordinator=*/nullptr);
|
||||
|
||||
return std::move(*plan->buildQueryPipeline(
|
||||
QueryPlanOptimizationSettings::fromContext(context),
|
||||
BuildQueryPipelineSettings::fromContext(context)));
|
||||
QueryPlanOptimizationSettings::fromContext(my_context),
|
||||
BuildQueryPipelineSettings::fromContext(my_context)));
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -200,10 +200,10 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
|
||||
|
||||
String query_string = formattedAST(query);
|
||||
|
||||
scalars["_shard_num"]
|
||||
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
|
||||
my_scalars["_shard_num"]
|
||||
= Block{{DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
|
||||
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||
std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
|
||||
std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, my_stage);
|
||||
|
||||
auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending);
|
||||
QueryPipelineBuilder builder;
|
||||
|
@ -107,7 +107,7 @@ struct ManyAggregatedData
|
||||
{
|
||||
// variant is moved here and will be destroyed in the destructor of the lambda function.
|
||||
pool->trySchedule(
|
||||
[variant = std::move(variant), thread_group = CurrentThread::getGroup()]()
|
||||
[my_variant = std::move(variant), thread_group = CurrentThread::getGroup()]()
|
||||
{
|
||||
SCOPE_EXIT_SAFE(
|
||||
if (thread_group)
|
||||
|
@ -665,24 +665,24 @@ void RemoteQueryExecutor::sendExternalTables()
|
||||
|
||||
auto data = std::make_unique<ExternalTableData>();
|
||||
data->table_name = table.first;
|
||||
data->creating_pipe_callback = [cur, limits, context = this->context]()
|
||||
data->creating_pipe_callback = [cur, limits, my_context = this->context]()
|
||||
{
|
||||
SelectQueryInfo query_info;
|
||||
auto metadata_snapshot = cur->getInMemoryMetadataPtr();
|
||||
auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot, context);
|
||||
auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot, my_context);
|
||||
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
|
||||
context, QueryProcessingStage::Complete, storage_snapshot, query_info);
|
||||
my_context, QueryProcessingStage::Complete, storage_snapshot, query_info);
|
||||
|
||||
QueryPlan plan;
|
||||
cur->read(
|
||||
plan,
|
||||
metadata_snapshot->getColumns().getNamesOfPhysical(),
|
||||
storage_snapshot, query_info, context,
|
||||
storage_snapshot, query_info, my_context,
|
||||
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
|
||||
|
||||
auto builder = plan.buildQueryPipeline(
|
||||
QueryPlanOptimizationSettings::fromContext(context),
|
||||
BuildQueryPipelineSettings::fromContext(context));
|
||||
QueryPlanOptimizationSettings::fromContext(my_context),
|
||||
BuildQueryPipelineSettings::fromContext(my_context));
|
||||
|
||||
builder->resize(1);
|
||||
builder->addTransform(std::make_shared<LimitsCheckingTransform>(builder->getHeader(), limits));
|
||||
|
@ -800,11 +800,11 @@ void HTTPHandler::processQuery(
|
||||
if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response"))
|
||||
used_output.out->addHeaderCORS(true);
|
||||
|
||||
auto append_callback = [context = context] (ProgressCallback callback)
|
||||
auto append_callback = [my_context = context] (ProgressCallback callback)
|
||||
{
|
||||
auto prev = context->getProgressCallback();
|
||||
auto prev = my_context->getProgressCallback();
|
||||
|
||||
context->setProgressCallback([prev, callback] (const Progress & progress)
|
||||
my_context->setProgressCallback([prev, callback] (const Progress & progress)
|
||||
{
|
||||
if (prev)
|
||||
prev(progress);
|
||||
|
@ -28,12 +28,12 @@ public:
|
||||
template <typename... TArgs>
|
||||
explicit HandlingRuleHTTPHandlerFactory(TArgs &&... args)
|
||||
{
|
||||
creator = [args = std::tuple<TArgs...>(std::forward<TArgs>(args) ...)]()
|
||||
creator = [my_args = std::tuple<TArgs...>(std::forward<TArgs>(args) ...)]()
|
||||
{
|
||||
return std::apply([&](auto && ... endpoint_args)
|
||||
{
|
||||
return std::make_unique<TEndpoint>(std::forward<decltype(endpoint_args)>(endpoint_args)...);
|
||||
}, std::move(args));
|
||||
}, std::move(my_args));
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -340,10 +340,10 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
|
||||
|
||||
std::atomic<size_t> affected_rows {0};
|
||||
auto prev = query_context->getProgressCallback();
|
||||
query_context->setProgressCallback([&, prev = prev](const Progress & progress)
|
||||
query_context->setProgressCallback([&, my_prev = prev](const Progress & progress)
|
||||
{
|
||||
if (prev)
|
||||
prev(progress);
|
||||
if (my_prev)
|
||||
my_prev(progress);
|
||||
|
||||
affected_rows += progress.written_rows;
|
||||
});
|
||||
|
@ -211,12 +211,12 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
|
||||
if (ctx->getSettingsRef().schema_inference_use_cache_for_hdfs)
|
||||
columns_from_cache = tryGetColumnsFromCache(paths, path_from_uri, last_mod_time, format, ctx);
|
||||
|
||||
ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
|
||||
ReadBufferIterator read_buffer_iterator = [&, my_uri_without_path = uri_without_path, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
|
||||
{
|
||||
if (it == paths.end())
|
||||
return nullptr;
|
||||
auto compression = chooseCompressionMethod(*it, compression_method);
|
||||
auto impl = std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef(), ctx->getReadSettings());
|
||||
auto impl = std::make_unique<ReadBufferFromHDFS>(my_uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef(), ctx->getReadSettings());
|
||||
const Int64 zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
|
||||
return wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
|
||||
};
|
||||
|
@ -32,17 +32,17 @@ std::vector<String> AsyncBlockIDsCache::getChildren()
|
||||
auto zookeeper = storage.getZooKeeper();
|
||||
|
||||
auto watch_callback = [last_time = this->last_updatetime.load()
|
||||
, update_min_interval = this->update_min_interval
|
||||
, task = task->shared_from_this()](const Coordination::WatchResponse &)
|
||||
, my_update_min_interval = this->update_min_interval
|
||||
, my_task = task->shared_from_this()](const Coordination::WatchResponse &)
|
||||
{
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
if (now - last_time < update_min_interval)
|
||||
if (now - last_time < my_update_min_interval)
|
||||
{
|
||||
std::chrono::milliseconds sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(update_min_interval - (now - last_time));
|
||||
task->scheduleAfter(sleep_time.count());
|
||||
std::chrono::milliseconds sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(my_update_min_interval - (now - last_time));
|
||||
my_task->scheduleAfter(sleep_time.count());
|
||||
}
|
||||
else
|
||||
task->schedule();
|
||||
my_task->schedule();
|
||||
};
|
||||
std::vector<String> children;
|
||||
Coordination::Stat stat;
|
||||
|
@ -1955,10 +1955,10 @@ try
|
||||
outdated_unloaded_data_parts.pop_back();
|
||||
}
|
||||
|
||||
parts_futures.push_back(runner([&, part = part]()
|
||||
parts_futures.push_back(runner([&, my_part = part]()
|
||||
{
|
||||
auto res = loadDataPartWithRetries(
|
||||
part->info, part->name, part->disk,
|
||||
my_part->info, my_part->name, my_part->disk,
|
||||
DataPartState::Outdated, data_parts_mutex, loading_parts_initial_backoff_ms,
|
||||
loading_parts_max_backoff_ms, loading_parts_max_tries);
|
||||
|
||||
@ -5226,9 +5226,9 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
|
||||
[storage = std::static_pointer_cast<MergeTreeData>(shared_from_this()),
|
||||
backup,
|
||||
part_path_in_backup = data_path_in_backup_fs / part_name,
|
||||
part_info=*part_info,
|
||||
my_part_info = *part_info,
|
||||
restored_parts_holder]
|
||||
{ storage->restorePartFromBackup(restored_parts_holder, part_info, part_path_in_backup); });
|
||||
{ storage->restorePartFromBackup(restored_parts_holder, my_part_info, part_path_in_backup); });
|
||||
|
||||
++num_parts;
|
||||
}
|
||||
|
@ -104,13 +104,13 @@ std::future<MergeTreeReaderPtr> MergeTreePrefetchedReadPool::createPrefetchedRea
|
||||
/// and we cannot block either, therefore make prefetch inside the pool and put the future
|
||||
/// into the read task (MergeTreeReadTask). When a thread calls getTask(), it will wait for
|
||||
/// it (if not yet ready) after getting the task.
|
||||
auto task = [=, reader = std::move(reader), context = getContext()]() mutable -> MergeTreeReaderPtr &&
|
||||
auto task = [=, my_reader = std::move(reader), context = getContext()]() mutable -> MergeTreeReaderPtr &&
|
||||
{
|
||||
/// For async read metrics in system.query_log.
|
||||
PrefetchIncrement watch(context->getAsyncReadCounters());
|
||||
|
||||
reader->prefetchBeginOfRange(priority);
|
||||
return std::move(reader);
|
||||
my_reader->prefetchBeginOfRange(priority);
|
||||
return std::move(my_reader);
|
||||
};
|
||||
return scheduleFromThreadPool<IMergeTreeDataPart::MergeTreeReaderPtr>(std::move(task), prefetch_threadpool, "ReadPrepare", priority);
|
||||
}
|
||||
|
@ -989,8 +989,10 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
|
||||
/// here lambda capture part name, it's ok since we'll not generate new one for this insert,
|
||||
/// see comments around 'part_committed_locally_but_zookeeper' flag
|
||||
retries_ctl.actionAfterLastFailedRetry(
|
||||
[&storage = storage, part_name = part->name]()
|
||||
{ storage.enqueuePartForCheck(part_name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER); });
|
||||
[&my_storage = storage, part_name = part->name]
|
||||
{
|
||||
my_storage.enqueuePartForCheck(part_name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
|
||||
});
|
||||
|
||||
/// We do not know whether or not data has been inserted.
|
||||
retries_ctl.setUserError(
|
||||
|
@ -1115,10 +1115,10 @@ void StorageDistributed::read(
|
||||
}
|
||||
|
||||
additional_shard_filter_generator =
|
||||
[&, custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) -> ASTPtr
|
||||
[&, my_custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) -> ASTPtr
|
||||
{
|
||||
return getCustomKeyFilterForParallelReplica(
|
||||
shard_count, shard_num - 1, custom_key_ast, settings.parallel_replicas_custom_key_filter_type, *this, local_context);
|
||||
shard_count, shard_num - 1, my_custom_key_ast, settings.parallel_replicas_custom_key_filter_type, *this, local_context);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -9325,9 +9325,9 @@ void StorageReplicatedMergeTree::backupData(
|
||||
/// This task will be executed after all replicas have collected their parts and the coordination is ready to
|
||||
/// give us the final list of parts to add to the BackupEntriesCollector.
|
||||
auto post_collecting_task = [shared_id,
|
||||
replica_name = getReplicaName(),
|
||||
my_replica_name = getReplicaName(),
|
||||
coordination,
|
||||
backup_entries = std::move(backup_entries),
|
||||
my_backup_entries = std::move(backup_entries),
|
||||
&backup_entries_collector]()
|
||||
{
|
||||
Strings data_paths = coordination->getReplicatedDataPaths(shared_id);
|
||||
@ -9336,10 +9336,10 @@ void StorageReplicatedMergeTree::backupData(
|
||||
for (const auto & data_path : data_paths)
|
||||
data_paths_fs.push_back(data_path);
|
||||
|
||||
Strings part_names = coordination->getReplicatedPartNames(shared_id, replica_name);
|
||||
Strings part_names = coordination->getReplicatedPartNames(shared_id, my_replica_name);
|
||||
std::unordered_set<std::string_view> part_names_set{part_names.begin(), part_names.end()};
|
||||
|
||||
for (const auto & [relative_path, backup_entry] : backup_entries)
|
||||
for (const auto & [relative_path, backup_entry] : my_backup_entries)
|
||||
{
|
||||
size_t slash_pos = relative_path.find('/');
|
||||
String part_name = relative_path.substr(0, slash_pos);
|
||||
@ -9349,7 +9349,7 @@ void StorageReplicatedMergeTree::backupData(
|
||||
backup_entries_collector.addBackupEntry(data_path / relative_path, backup_entry);
|
||||
}
|
||||
|
||||
auto mutation_infos = coordination->getReplicatedMutations(shared_id, replica_name);
|
||||
auto mutation_infos = coordination->getReplicatedMutations(shared_id, my_replica_name);
|
||||
for (const auto & mutation_info : mutation_infos)
|
||||
{
|
||||
auto backup_entry = ReplicatedMergeTreeMutationEntry::parse(mutation_info.entry, mutation_info.id).backup();
|
||||
|
@ -175,12 +175,12 @@ Pipe StorageSystemReplicas::read(
|
||||
|
||||
for (size_t i = 0; i < tables_size; ++i)
|
||||
{
|
||||
thread_pool.scheduleOrThrowOnError([&, i=i]
|
||||
thread_pool.scheduleOrThrowOnError([&, my_i = i]
|
||||
{
|
||||
dynamic_cast<StorageReplicatedMergeTree &>(
|
||||
*replicated_tables
|
||||
[(*col_database)[i].safeGet<const String &>()]
|
||||
[(*col_table)[i].safeGet<const String &>()]).getStatus(statuses[i], with_zk_fields);
|
||||
[(*col_database)[my_i].safeGet<const String &>()]
|
||||
[(*col_table)[my_i].safeGet<const String &>()]).getStatus(statuses[my_i], with_zk_fields);
|
||||
});
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user