Log dictionary name during loading

This commit is contained in:
vdimir 2024-02-22 14:58:01 +00:00
parent c3f925da0a
commit 4a7ad15a9a
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
2 changed files with 12 additions and 7 deletions

View File

@ -777,7 +777,6 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::loadData()
{
if (!source_ptr->hasUpdateField())
{
std::optional<DictionaryParallelLoaderType> parallel_loader;
if constexpr (sharded)
parallel_loader.emplace(*this);
@ -790,6 +789,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::loadData()
size_t total_rows = 0;
size_t total_blocks = 0;
String dictionary_name = getFullName();
Block block;
while (true)
@ -809,7 +809,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::loadData()
if (parallel_loader)
{
parallel_loader->addBlock(block);
parallel_loader->addBlock(std::move(block));
}
else
{
@ -822,10 +822,12 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::loadData()
if (parallel_loader)
parallel_loader->finish();
LOG_DEBUG(getLogger("HashedArrayDictionary"),
"Finished {}reading {} blocks with {} rows from pipeline in {:.2f} sec and inserted into hashtable in {:.2f} sec",
LOG_DEBUG(log,
"Finished {}reading {} blocks with {} rows to dictionary {} from pipeline in {:.2f} sec and inserted into hashtable in {:.2f} sec",
configuration.use_async_executor ? "asynchronous " : "",
total_blocks, total_rows, pull_time_microseconds / 1000000.0, process_time_microseconds / 1000000.0);
total_blocks, total_rows,
dictionary_name,
pull_time_microseconds / 1000000.0, process_time_microseconds / 1000000.0);
}
else
{

View File

@ -46,12 +46,13 @@ class HashedDictionaryParallelLoader : public boost::noncopyable
public:
explicit HashedDictionaryParallelLoader(DictionaryType & dictionary_)
: dictionary(dictionary_)
, dictionary_name(dictionary.getFullName())
, shards(dictionary.configuration.shards)
, pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, CurrentMetrics::HashedDictionaryThreadsScheduled, shards)
, shards_queues(shards)
{
UInt64 backlog = dictionary.configuration.shard_load_queue_backlog;
LOG_TRACE(dictionary.log, "Will load the dictionary using {} threads (with {} backlog)", shards, backlog);
LOG_TRACE(dictionary.log, "Will load the {} dictionary using {} threads (with {} backlog)", dictionary_name, shards, backlog);
shards_slots.resize(shards);
iota(shards_slots.data(), shards_slots.size(), UInt64(0));
@ -82,6 +83,7 @@ public:
{
IColumn::Selector selector = createShardSelector(block, shards_slots);
Blocks shards_blocks = splitBlock(selector, block);
block.clear();
for (size_t shard = 0; shard < shards; ++shard)
{
@ -98,7 +100,7 @@ public:
Stopwatch watch;
pool.wait();
UInt64 elapsed_ms = watch.elapsedMilliseconds();
LOG_TRACE(dictionary.log, "Processing the tail took {}ms", elapsed_ms);
LOG_TRACE(dictionary.log, "Processing the tail of dictionary {} took {}ms", dictionary_name, elapsed_ms);
}
~HashedDictionaryParallelLoader()
@ -119,6 +121,7 @@ public:
private:
DictionaryType & dictionary;
String dictionary_name;
const size_t shards;
ThreadPool pool;
std::vector<std::optional<ConcurrentBoundedQueue<Block>>> shards_queues;