Merge branch 'master' into fix-ibm

This commit is contained in:
Alexey Milovidov 2023-04-23 14:26:15 +02:00
commit 3621a4d7ac
8 changed files with 186 additions and 83 deletions

View File

@ -12,7 +12,7 @@
--chart-background: white;
--shadow-color: rgba(0, 0, 0, 0.25);
--input-shadow-color: rgba(0, 255, 0, 1);
--error-color: red;
--error-color: white;
--legend-background: rgba(255, 255, 255, 0.75);
--title-color: #666;
--text-color: black;
@ -76,7 +76,7 @@
#charts
{
height: 100%;
display: none;
display: flex;
flex-flow: row wrap;
gap: 1rem;
}
@ -121,6 +121,19 @@
.unconnected #url {
width: 100%;
}
.unconnected #button-options {
display: grid;
grid-auto-flow: column;
grid-auto-columns: 1fr;
gap: 0.3rem;
}
.unconnected #user {
margin-right: 0;
width: auto;
}
.unconnected #password {
width: auto;
}
#user {
margin-right: 0.25rem;
width: 50%;
@ -136,7 +149,15 @@
width: 100%;
display: flex;
flex-flow: row nowrap;
flex-flow: row nowrap;
}
.unconnected #username-password {
width: 100%;
gap: 0.3rem;
display: grid;
grid-template-columns: 1fr 1fr;
}
.inputs #chart-params {
@ -177,7 +198,10 @@
.themes {
float: right;
font-size: 20pt;
margin-bottom: 1rem;
gap: 0.3rem;
display: flex;
justify-content: center;
}
#toggle-dark, #toggle-light {
@ -206,6 +230,8 @@
}
#add, #reload {
padding: .25rem 0.5rem;
text-align: center;
font-weight: bold;
user-select: none;
cursor: pointer;
@ -214,16 +240,24 @@
background: var(--new-chart-background-color);
color: var(--new-chart-text-color);
float: right;
margin-right: 0 !important;
margin-left: 1rem;
margin-right: 1rem !important;
margin-left: 0rem;
margin-bottom: 1rem;
}
/* .unconnected #reload {
margin-left: 3px;
} */
#add:hover, #reload:hover {
background: var(--button-background-color);
}
#auth-error {
align-self: center;
width: 60%;
padding: .5rem;
color: var(--error-color);
display: flex;
@ -352,15 +386,15 @@
<input spellcheck="false" id="password" type="password" placeholder="password" />
</div>
</div>
<div>
<input id="reload" type="button" value="Reload">
<input id="add" type="button" value="Add chart" style="display: none;">
<div id="button-options">
<span class="nowrap themes"><span id="toggle-dark">🌚</span><span id="toggle-light">🌞</span></span>
<input id="add" type="button" value="Add chart" style="display: none;">
<input id="reload" type="button" value="Reload">
<div id="chart-params"></div>
</div>
</form>
<div id="auth-error"></div>
</div>
<div id="auth-error"></div>
<div id="charts"></div>
<script>
@ -390,10 +424,21 @@ if (location.protocol != 'file:') {
user = 'default';
}
const errorCodeRegex = /Code: (\d+)/
const errorCodeMessageMap = {
516: 'Error authenticating with database. Please check your connection params and try again.'
}
const errorMessages = [
{
regex: /TypeError: Failed to fetch/,
messageFunc: () => 'Error authenticating with database. Please check your connection url and try again.',
},
{
regex: /Code: (\d+)/,
messageFunc: (match) => {
return errorCodeMessageMap[match[1]]
}
}
]
/// This is just a demo configuration of the dashboard.
@ -769,21 +814,6 @@ document.getElementById('reload').addEventListener('click', e => {
reloadAll();
});
function showReloadIfNeeded() {
const is_any_field_changed = (host != document.getElementById('url').value
|| user != document.getElementById('user').value
|| password != document.getElementById('password').value);
if (is_any_field_changed) {
document.getElementById('reload').style.display = '';
} else {
document.getElementById('reload').style.display = 'none';
}
}
document.getElementById('password').addEventListener('input', e => { showReloadIfNeeded(); })
document.getElementById('user').addEventListener('input', e => { showReloadIfNeeded(); })
document.getElementById('url').addEventListener('input', e => { showReloadIfNeeded(); })
function legendAsTooltipPlugin({ className, style = { background: "var(--legend-background)" } } = {}) {
let legendEl;
@ -875,14 +905,12 @@ async function draw(idx, chart, url_params, query) {
}
if (error) {
const errorMatch = error.match(errorCodeRegex)
if (errorMatch && errorMatch[1]) {
const code = errorMatch[1]
if (errorCodeMessageMap[code]) {
const authError = new Error(errorCodeMessageMap[code])
authError.code = code
throw authError
}
const errorMatch = errorMessages.find(({ regex }) => error.match(regex))
if (errorMatch) {
const match = error.match(errorMatch.regex)
const message = errorMatch.messageFunc(match)
const authError = new Error(message)
throw authError
}
}
@ -950,7 +978,8 @@ async function draw(idx, chart, url_params, query) {
function showAuthError(message) {
const charts = document.querySelector('#charts');
charts.style.display = 'none';
charts.style.height = '0px';
charts.style.opacity = '0';
const add = document.querySelector('#add');
add.style.display = 'none';
@ -961,7 +990,8 @@ function showAuthError(message) {
function hideAuthError() {
const charts = document.querySelector('#charts');
charts.style.display = 'flex';
charts.style.height = 'auto';
charts.style.opacity = '1';
const authError = document.querySelector('#auth-error');
authError.textContent = '';
@ -972,13 +1002,13 @@ let firstLoad = true;
async function drawAll() {
let params = getParamsForURL();
const charts = document.getElementsByClassName('chart');
const chartsArray = document.getElementsByClassName('chart');
if (!firstLoad) {
hideAuthError();
}
await Promise.all([...Array(queries.length)].map(async (_, i) => {
return draw(i, charts[i], params, queries[i].query).catch((e) => {
return draw(i, chartsArray[i], params, queries[i].query).catch((e) => {
if (!firstLoad) {
showAuthError(e.message);
}
@ -995,6 +1025,9 @@ async function drawAll() {
element.classList.remove('unconnected');
const add = document.querySelector('#add');
add.style.display = 'block';
} else {
const charts = document.querySelector('#charts')
charts.style.height = '0px';
}
})
}

View File

@ -667,6 +667,7 @@ class IColumn;
M(Bool, allow_prefetched_read_pool_for_remote_filesystem, false, "Prefer prefethed threadpool if all parts are on remote filesystem", 0) \
M(Bool, allow_prefetched_read_pool_for_local_filesystem, false, "Prefer prefethed threadpool if all parts are on remote filesystem", 0) \
\
M(UInt64, prefetch_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the prefetch buffer to read from the filesystem.", 0) \
M(UInt64, filesystem_prefetch_step_bytes, 0, "Prefetch step in bytes. Zero means `auto` - approximately the best prefetch step will be auto deduced, but might not be 100% the best. The actual value might be different because of setting filesystem_prefetch_min_bytes_for_single_read_task", 0) \
M(UInt64, filesystem_prefetch_step_marks, 0, "Prefetch step in marks. Zero means `auto` - approximately the best prefetch step will be auto deduced, but might not be 100% the best. The actual value might be different because of setting filesystem_prefetch_min_bytes_for_single_read_task", 0) \
M(UInt64, filesystem_prefetch_min_bytes_for_single_read_task, "8Mi", "Do not parallelize within one file read less than this amount of bytes. E.g. one reader will not receive a read task of size less than this amount. This setting is recommended to avoid spikes of time for aws getObject requests to aws", 0) \

View File

@ -53,7 +53,7 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
, reader(reader_)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.remote_fs_buffer_size)
, prefetch_buffer(settings_.prefetch_buffer_size)
, min_bytes_for_seek(min_bytes_for_seek_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
@ -139,7 +139,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch(int64_t priority)
last_prefetch_info.priority = priority;
/// Prefetch even in case hasPendingData() == true.
chassert(prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
chassert(prefetch_buffer.size() == read_settings.prefetch_buffer_size || prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
@ -224,7 +224,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
chassert(memory.size() == read_settings.remote_fs_buffer_size);
chassert(memory.size() == read_settings.prefetch_buffer_size || memory.size() == read_settings.remote_fs_buffer_size);
std::tie(size, offset) = impl->readInto(memory.data(), memory.size(), file_offset_of_buffer_end, bytes_to_ignore);
bytes_to_ignore = 0;

View File

@ -70,6 +70,7 @@ struct ReadSettings
size_t local_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
size_t remote_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
size_t prefetch_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
bool local_fs_prefetch = false;
bool remote_fs_prefetch = false;
@ -127,6 +128,7 @@ struct ReadSettings
ReadSettings res = *this;
res.local_fs_buffer_size = std::min(std::max(1ul, file_size), local_fs_buffer_size);
res.remote_fs_buffer_size = std::min(std::max(1ul, file_size), remote_fs_buffer_size);
res.prefetch_buffer_size = std::min(std::max(1ul, file_size), prefetch_buffer_size);
return res;
}
};

View File

@ -4285,6 +4285,7 @@ ReadSettings Context::getReadSettings() const
res.local_fs_buffer_size = settings.max_read_buffer_size;
res.remote_fs_buffer_size = settings.max_read_buffer_size;
res.prefetch_buffer_size = settings.prefetch_buffer_size;
res.direct_io_threshold = settings.min_bytes_to_use_direct_io;
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
res.priority = settings.read_priority;

View File

@ -24,6 +24,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
@ -80,6 +81,9 @@ struct MergeTreePrefetchedReadPool::PartInfo
size_t approx_size_of_mark = 0;
size_t prefetch_step_marks = 0;
size_t estimated_memory_usage_for_single_prefetch = 0;
size_t required_readers_num = 0;
};
std::future<MergeTreeReaderPtr> MergeTreePrefetchedReadPool::createPrefetchedReader(
@ -136,14 +140,37 @@ void MergeTreePrefetchedReadPool::createPrefetchedReaderForTask(MergeTreeReadTas
bool MergeTreePrefetchedReadPool::TaskHolder::operator <(const TaskHolder & other) const
{
return task->priority < other.task->priority;
chassert(task->priority >= 0);
chassert(other.task->priority >= 0);
return -task->priority < -other.task->priority; /// Less is better.
/// With default std::priority_queue, top() returns largest element.
/// So closest to 0 will be on top with this comparator.
}
void MergeTreePrefetchedReadPool::startPrefetches() const
{
for (const auto & task : prefetch_queue)
if (prefetch_queue.empty())
return;
[[maybe_unused]] TaskHolder prev(nullptr, 0);
[[maybe_unused]] const int64_t highest_priority = reader_settings.read_settings.priority + 1;
assert(prefetch_queue.top().task->priority == highest_priority);
while (!prefetch_queue.empty())
{
createPrefetchedReaderForTask(*task.task);
const auto & top = prefetch_queue.top();
createPrefetchedReaderForTask(*top.task);
#ifndef NDEBUG
if (prev.task)
{
assert(top.task->priority >= highest_priority);
if (prev.thread_id == top.thread_id)
{
assert(prev.task->priority < top.task->priority);
}
}
prev = top;
#endif
prefetch_queue.pop();
}
}
@ -156,8 +183,8 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::getTask(size_t thread)
if (!started_prefetches)
{
startPrefetches();
started_prefetches = true;
startPrefetches();
}
auto it = threads_tasks.find(thread);
@ -270,29 +297,10 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::getTask(size_t thread)
size_t MergeTreePrefetchedReadPool::getApproxSizeOfGranule(const IMergeTreeDataPart & part) const
{
const auto & columns = part.getColumns();
auto all_columns_are_fixed_size = columns.end() == std::find_if(
columns.begin(), columns.end(),
[](const auto & col){ return col.type->haveMaximumSizeOfValue() == false; });
if (all_columns_are_fixed_size)
{
size_t approx_size = 0;
for (const auto & col : columns)
approx_size += col.type->getMaximumSizeOfValueInMemory() * fixed_index_granularity;
if (!index_granularity_bytes)
return approx_size;
return std::min(index_granularity_bytes, approx_size);
}
const size_t approx_size = static_cast<size_t>(std::round(static_cast<double>(part.getBytesOnDisk()) / part.getMarksCount()));
if (!index_granularity_bytes)
return approx_size;
return std::min(index_granularity_bytes, approx_size);
ColumnSize columns_size{};
for (const auto & col_name : column_names)
columns_size.add(part.getColumnSize(col_name));
return columns_size.data_compressed / part.getMarksCount();
}
MergeTreePrefetchedReadPool::PartsInfos MergeTreePrefetchedReadPool::getPartsInfos(
@ -300,6 +308,7 @@ MergeTreePrefetchedReadPool::PartsInfos MergeTreePrefetchedReadPool::getPartsInf
{
PartsInfos result;
Block sample_block = storage_snapshot->metadata->getSampleBlock();
const auto & settings = getContext()->getSettingsRef();
const bool predict_block_size_bytes = preferred_block_size_bytes > 0;
for (const auto & part : parts)
@ -338,6 +347,37 @@ MergeTreePrefetchedReadPool::PartsInfos MergeTreePrefetchedReadPool::getPartsInf
part_info->column_name_set = {required_column_names.begin(), required_column_names.end()};
part_info->task_columns = task_columns;
/// adjustBufferSize(), which is done in MergeTreeReaderStream and MergeTreeReaderCompact,
/// lowers buffer size if file size (or required read range) is less. So we know that the
/// settings.prefetch_buffer_size will be lowered there, therefore we account it here as well.
/// But here we make a more approximate lowering (because we do not have loaded marks yet),
/// while in adjustBufferSize it will be presize.
for (const auto & col : task_columns.columns)
{
const auto col_size = part.data_part->getColumnSize(col.name).data_compressed;
part_info->estimated_memory_usage_for_single_prefetch += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++part_info->required_readers_num;
}
if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete())
{
const auto col_size = part.data_part->getColumnSize(
LightweightDeleteDescription::FILTER_COLUMN.name).data_compressed;
part_info->estimated_memory_usage_for_single_prefetch += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++part_info->required_readers_num;
}
if (prewhere_info)
{
for (const auto & columns : task_columns.pre_columns)
{
for (const auto & col : columns)
{
const size_t col_size = part.data_part->getColumnSize(col.name).data_compressed;
part_info->estimated_memory_usage_for_single_prefetch += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++part_info->required_readers_num;
}
}
}
result.push_back(std::move(part_info));
}
@ -423,10 +463,14 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
"Sum marks: {}, threads: {}, min_marks_per_thread: {}, result prefetch step marks: {}, prefetches limit: {}, total_size_approx: {}",
sum_marks, threads, min_marks_per_thread, settings.filesystem_prefetch_step_bytes, settings.filesystem_prefetches_limit, total_size_approx);
size_t current_prefetches_count = 0;
size_t allowed_memory_usage = settings.filesystem_prefetch_max_memory_usage;
if (!allowed_memory_usage)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `filesystem_prefetch_max_memory_usage` must be non-zero");
std::optional<size_t> allowed_prefetches_num = settings.filesystem_prefetches_limit
? std::optional<size_t>(settings.filesystem_prefetches_limit)
: std::nullopt;
ThreadsTasks result_threads_tasks;
size_t memory_usage_approx = 0;
for (size_t i = 0, part_idx = 0; i < threads && part_idx < parts_infos.size(); ++i)
{
auto need_marks = min_marks_per_thread;
@ -514,22 +558,25 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
read_task->priority = priority;
bool allow_prefetch = !settings.filesystem_prefetches_limit || current_prefetches_count + 1 <= settings.filesystem_prefetches_limit;
if (allow_prefetch && settings.filesystem_prefetch_max_memory_usage)
bool allow_prefetch = false;
if (allowed_memory_usage
&& (allowed_prefetches_num.has_value() == false || allowed_prefetches_num.value() > 0))
{
size_t num_readers = 1;
if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete())
++num_readers;
if (prewhere_info)
num_readers += part.task_columns.pre_columns.size();
memory_usage_approx += settings.max_read_buffer_size * num_readers;
allow_prefetch = part.estimated_memory_usage_for_single_prefetch <= allowed_memory_usage
&& (allowed_prefetches_num.has_value() == false
|| part.required_readers_num <= allowed_prefetches_num.value());
allow_prefetch = memory_usage_approx <= settings.filesystem_prefetch_max_memory_usage;
if (allow_prefetch)
{
allowed_memory_usage -= part.estimated_memory_usage_for_single_prefetch;
if (allowed_prefetches_num.has_value())
*allowed_prefetches_num -= part.required_readers_num;
}
}
if (allow_prefetch)
{
prefetch_queue.emplace(TaskHolder(read_task.get()));
++current_prefetches_count;
prefetch_queue.emplace(TaskHolder(read_task.get(), i));
}
++priority;

View File

@ -102,11 +102,12 @@ private:
struct TaskHolder
{
explicit TaskHolder(MergeTreeReadTask * task_) : task(task_) {}
explicit TaskHolder(MergeTreeReadTask * task_, size_t thread_id_) : task(task_), thread_id(thread_id_) {}
MergeTreeReadTask * task;
size_t thread_id;
bool operator <(const TaskHolder & other) const;
};
mutable boost::heap::priority_queue<TaskHolder> prefetch_queue;
mutable std::priority_queue<TaskHolder> prefetch_queue; /// the smallest on top
bool started_prefetches = false;
/// A struct which allows to track max number of tasks which were in the

View File

@ -547,6 +547,24 @@ class SettingsRandomizer:
"remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]),
"local_filesystem_read_prefetch": lambda: random.randint(0, 1),
"remote_filesystem_read_prefetch": lambda: random.randint(0, 1),
"allow_prefetched_read_pool_for_remote_filesystem": lambda: random.randint(
0, 1
),
"filesystem_prefetch_max_memory_usage": lambda: random.choice(
["10Mi", "100Mi", "500Mi"]
),
"filesystem_prefetches_limit": lambda: random.choice(
[0, 10]
), # 0 means unlimited (but anyway limited by prefetch_max_memory_usage)
"filesystem_prefetch_min_bytes_for_single_read_task": lambda: random.choice(
["1Mi", "8Mi", "16Mi"]
),
"filesystem_prefetch_step_marks": lambda: random.choice(
[0, 50]
), # 0 means 'auto'
"filesystem_prefetch_step_bytes": lambda: random.choice(
[0, "100Mi"]
), # 0 means 'auto'
"compile_expressions": lambda: random.randint(0, 1),
"compile_aggregate_expressions": lambda: random.randint(0, 1),
"compile_sort_description": lambda: random.randint(0, 1),