mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge branch 'master' into fix-test-profile-events-s3
This commit is contained in:
commit
141d6b835c
@ -19,8 +19,19 @@ Standard aggregate functions:
|
||||
- [stddevSamp](/docs/en/sql-reference/aggregate-functions/reference/stddevsamp.md)
|
||||
- [varPop](/docs/en/sql-reference/aggregate-functions/reference/varpop.md)
|
||||
- [varSamp](/docs/en/sql-reference/aggregate-functions/reference/varsamp.md)
|
||||
- [corr](./corr.md)
|
||||
- [covarPop](/docs/en/sql-reference/aggregate-functions/reference/covarpop.md)
|
||||
- [covarSamp](/docs/en/sql-reference/aggregate-functions/reference/covarsamp.md)
|
||||
- [entropy](./entropy.md)
|
||||
- [exponentialMovingAverage](./exponentialmovingaverage.md)
|
||||
- [intervalLengthSum](./intervalLengthSum.md)
|
||||
- [kolmogorovSmirnovTest](./kolmogorovsmirnovtest.md)
|
||||
- [mannwhitneyutest](./mannwhitneyutest.md)
|
||||
- [median](./median.md)
|
||||
- [rankCorr](./rankCorr.md)
|
||||
- [sumKahan](./sumkahan.md)
|
||||
- [studentTTest](./studentttest.md)
|
||||
- [welchTTest](./welchttest.md)
|
||||
|
||||
ClickHouse-specific aggregate functions:
|
||||
|
||||
@ -34,12 +45,15 @@ ClickHouse-specific aggregate functions:
|
||||
- [avgWeighted](/docs/en/sql-reference/aggregate-functions/reference/avgweighted.md)
|
||||
- [topK](/docs/en/sql-reference/aggregate-functions/reference/topk.md)
|
||||
- [topKWeighted](/docs/en/sql-reference/aggregate-functions/reference/topkweighted.md)
|
||||
- [deltaSum](./deltasum.md)
|
||||
- [deltaSumTimestamp](./deltasumtimestamp.md)
|
||||
- [groupArray](/docs/en/sql-reference/aggregate-functions/reference/grouparray.md)
|
||||
- [groupArrayLast](/docs/en/sql-reference/aggregate-functions/reference/grouparraylast.md)
|
||||
- [groupUniqArray](/docs/en/sql-reference/aggregate-functions/reference/groupuniqarray.md)
|
||||
- [groupArrayInsertAt](/docs/en/sql-reference/aggregate-functions/reference/grouparrayinsertat.md)
|
||||
- [groupArrayMovingAvg](/docs/en/sql-reference/aggregate-functions/reference/grouparraymovingavg.md)
|
||||
- [groupArrayMovingSum](/docs/en/sql-reference/aggregate-functions/reference/grouparraymovingsum.md)
|
||||
- [groupArraySample](./grouparraysample.md)
|
||||
- [groupBitAnd](/docs/en/sql-reference/aggregate-functions/reference/groupbitand.md)
|
||||
- [groupBitOr](/docs/en/sql-reference/aggregate-functions/reference/groupbitor.md)
|
||||
- [groupBitXor](/docs/en/sql-reference/aggregate-functions/reference/groupbitxor.md)
|
||||
@ -84,3 +98,9 @@ ClickHouse-specific aggregate functions:
|
||||
- [theilsU](./theilsu.md)
|
||||
- [maxIntersections](./maxintersections.md)
|
||||
- [maxIntersectionsPosition](./maxintersectionsposition.md)
|
||||
- [meanZTest](./meanztest.md)
|
||||
- [quantileGK](./quantileGK.md)
|
||||
- [quantileInterpolatedWeighted](./quantileinterpolatedweighted.md)
|
||||
- [sparkBar](./sparkbar.md)
|
||||
- [sumCount](./sumcount.md)
|
||||
|
||||
|
@ -14,6 +14,7 @@ public:
|
||||
virtual bool isValid() = 0;
|
||||
virtual RelativePathWithMetadata current() = 0;
|
||||
virtual RelativePathsWithMetadata currentBatch() = 0;
|
||||
virtual std::optional<RelativePathsWithMetadata> getCurrrentBatchAndScheduleNext() = 0;
|
||||
virtual size_t getAccumulatedSize() const = 0;
|
||||
|
||||
virtual ~IObjectStorageIterator() = default;
|
||||
@ -53,6 +54,11 @@ public:
|
||||
return batch;
|
||||
}
|
||||
|
||||
virtual std::optional<RelativePathsWithMetadata> getCurrrentBatchAndScheduleNext() override
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
size_t getAccumulatedSize() const override
|
||||
{
|
||||
return batch.size();
|
||||
|
@ -100,6 +100,22 @@ RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch()
|
||||
return current_batch;
|
||||
}
|
||||
|
||||
std::optional<RelativePathsWithMetadata> IObjectStorageIteratorAsync::getCurrrentBatchAndScheduleNext()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!is_initialized)
|
||||
nextBatch();
|
||||
|
||||
if (current_batch_iterator != current_batch.end())
|
||||
{
|
||||
auto temp_current_batch = current_batch;
|
||||
nextBatch();
|
||||
return temp_current_batch;
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
size_t IObjectStorageIteratorAsync::getAccumulatedSize() const
|
||||
{
|
||||
return accumulated_size.load(std::memory_order_relaxed);
|
||||
|
@ -27,6 +27,7 @@ public:
|
||||
RelativePathWithMetadata current() override;
|
||||
RelativePathsWithMetadata currentBatch() override;
|
||||
size_t getAccumulatedSize() const override;
|
||||
std::optional<RelativePathsWithMetadata> getCurrrentBatchAndScheduleNext() override;
|
||||
|
||||
~IObjectStorageIteratorAsync() override
|
||||
{
|
||||
@ -48,7 +49,7 @@ protected:
|
||||
bool is_initialized{false};
|
||||
bool is_finished{false};
|
||||
|
||||
mutable std::mutex mutex;
|
||||
mutable std::recursive_mutex mutex;
|
||||
ThreadPool list_objects_pool;
|
||||
ThreadPoolCallbackRunner<BatchAndHasNext> list_objects_scheduler;
|
||||
std::future<BatchAndHasNext> outcome_future;
|
||||
|
@ -498,12 +498,14 @@ KeyMetadata::iterator FileCache::addFileSegment(
|
||||
chassert(size > 0); /// Empty file segments in cache are not allowed.
|
||||
|
||||
const auto & key = locked_key.getKey();
|
||||
if (locked_key.tryGetByOffset(offset))
|
||||
const FileSegment::Range range(offset, offset + size - 1);
|
||||
|
||||
if (auto intersecting_range = locked_key.hasIntersectingRange(range))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Cache entry already exists for key: `{}`, offset: {}, size: {}.",
|
||||
key, offset, size);
|
||||
"Attempt to add intersecting file segment in cache ({} intersects {})",
|
||||
range.toString(), intersecting_range->toString());
|
||||
}
|
||||
|
||||
FileSegment::State result_state;
|
||||
@ -958,8 +960,20 @@ void FileCache::loadMetadata()
|
||||
if ((main_priority->getSizeLimit() == 0 || main_priority->getSize(lock) + size <= main_priority->getSizeLimit())
|
||||
&& (main_priority->getElementsLimit() == 0 || main_priority->getElementsCount(lock) + 1 <= main_priority->getElementsLimit()))
|
||||
{
|
||||
auto file_segment_metadata_it = addFileSegment(
|
||||
*locked_key, offset, size, FileSegment::State::DOWNLOADED, CreateFileSegmentSettings(segment_kind), &lock);
|
||||
KeyMetadata::iterator file_segment_metadata_it;
|
||||
try
|
||||
{
|
||||
file_segment_metadata_it = addFileSegment(
|
||||
*locked_key, offset, size, FileSegment::State::DOWNLOADED, CreateFileSegmentSettings(segment_kind), &lock);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
chassert(false);
|
||||
|
||||
fs::remove(offset_it->path());
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto & file_segment_metadata = file_segment_metadata_it->second;
|
||||
chassert(file_segment_metadata->file_segment->assertCorrectness());
|
||||
|
@ -129,6 +129,8 @@ public:
|
||||
|
||||
bool operator==(const Range & other) const { return left == other.left && right == other.right; }
|
||||
|
||||
bool operator<(const Range & other) const { return right < other.left; }
|
||||
|
||||
size_t size() const { return right - left + 1; }
|
||||
|
||||
String toString() const { return fmt::format("[{}, {}]", std::to_string(left), std::to_string(right)); }
|
||||
|
@ -448,6 +448,29 @@ void LockedKey::shrinkFileSegmentToDownloadedSize(
|
||||
chassert(file_segment->assertCorrectnessUnlocked(segment_lock));
|
||||
}
|
||||
|
||||
std::optional<FileSegment::Range> LockedKey::hasIntersectingRange(const FileSegment::Range & range) const
|
||||
{
|
||||
if (key_metadata->empty())
|
||||
return {};
|
||||
|
||||
auto it = key_metadata->lower_bound(range.left);
|
||||
if (it != key_metadata->end()) /// has next range
|
||||
{
|
||||
auto next_range = it->second->file_segment->range();
|
||||
if (!(range < next_range))
|
||||
return next_range;
|
||||
|
||||
if (it == key_metadata->begin())
|
||||
return {};
|
||||
}
|
||||
|
||||
auto prev_range = std::prev(it)->second->file_segment->range();
|
||||
if (!(prev_range < range))
|
||||
return prev_range;
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
std::shared_ptr<const FileSegmentMetadata> LockedKey::getByOffset(size_t offset) const
|
||||
{
|
||||
auto it = key_metadata->find(offset);
|
||||
|
@ -164,6 +164,8 @@ struct LockedKey : private boost::noncopyable
|
||||
|
||||
bool isLastOwnerOfFileSegment(size_t offset) const;
|
||||
|
||||
std::optional<FileSegment::Range> hasIntersectingRange(const FileSegment::Range & range) const;
|
||||
|
||||
void removeFromCleanupQueue();
|
||||
|
||||
void markAsRemoved();
|
||||
|
@ -246,6 +246,7 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q
|
||||
priorities.insert(static_cast<int>(settings.priority)),
|
||||
std::move(thread_group),
|
||||
query_kind,
|
||||
settings,
|
||||
watch_start_nanoseconds));
|
||||
|
||||
increaseQueryKindAmount(query_kind);
|
||||
@ -342,6 +343,7 @@ QueryStatus::QueryStatus(
|
||||
QueryPriorities::Handle && priority_handle_,
|
||||
ThreadGroupPtr && thread_group_,
|
||||
IAST::QueryKind query_kind_,
|
||||
const Settings & query_settings_,
|
||||
UInt64 watch_start_nanoseconds)
|
||||
: WithContext(context_)
|
||||
, query(query_)
|
||||
@ -353,9 +355,11 @@ QueryStatus::QueryStatus(
|
||||
, query_kind(query_kind_)
|
||||
, num_queries_increment(CurrentMetrics::Query)
|
||||
{
|
||||
auto settings = getContext()->getSettings();
|
||||
limits.max_execution_time = settings.max_execution_time;
|
||||
overflow_mode = settings.timeout_overflow_mode;
|
||||
/// We have to pass `query_settings_` to this constructor because we can't use `context_->getSettings().max_execution_time` here:
|
||||
/// a QueryStatus is created with `ProcessList::mutex` locked (see ProcessList::insert) and calling `context_->getSettings()`
|
||||
/// would lock the context's lock too, whereas holding two those locks simultaneously is not good.
|
||||
limits.max_execution_time = query_settings_.max_execution_time;
|
||||
overflow_mode = query_settings_.timeout_overflow_mode;
|
||||
}
|
||||
|
||||
QueryStatus::~QueryStatus()
|
||||
@ -589,10 +593,13 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
|
||||
res.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_group->performance_counters.getPartiallyAtomicSnapshot());
|
||||
}
|
||||
|
||||
if (get_settings && getContext())
|
||||
if (get_settings)
|
||||
{
|
||||
res.query_settings = std::make_shared<Settings>(getContext()->getSettings());
|
||||
res.current_database = getContext()->getCurrentDatabase();
|
||||
if (auto ctx = context.lock())
|
||||
{
|
||||
res.query_settings = std::make_shared<Settings>(ctx->getSettings());
|
||||
res.current_database = ctx->getCurrentDatabase();
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
@ -601,12 +608,18 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
|
||||
|
||||
ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_events, bool get_settings) const
|
||||
{
|
||||
/// We have to copy `processes` first because `process->getInfo()` below can access the context to get the query settings,
|
||||
/// and it's better not to keep the process list's lock while doing that.
|
||||
std::vector<QueryStatusPtr> processes_copy;
|
||||
|
||||
{
|
||||
auto lock = safeLock();
|
||||
processes_copy.assign(processes.begin(), processes.end());
|
||||
}
|
||||
|
||||
Info per_query_infos;
|
||||
|
||||
auto lock = safeLock();
|
||||
|
||||
per_query_infos.reserve(processes.size());
|
||||
for (const auto & process : processes)
|
||||
per_query_infos.reserve(processes_copy.size());
|
||||
for (const auto & process : processes_copy)
|
||||
per_query_infos.emplace_back(process->getInfo(get_thread_list, get_profile_events, get_settings));
|
||||
|
||||
return per_query_infos;
|
||||
|
@ -164,6 +164,7 @@ public:
|
||||
QueryPriorities::Handle && priority_handle_,
|
||||
ThreadGroupPtr && thread_group_,
|
||||
IAST::QueryKind query_kind_,
|
||||
const Settings & query_settings_,
|
||||
UInt64 watch_start_nanoseconds);
|
||||
|
||||
~QueryStatus();
|
||||
|
@ -624,19 +624,19 @@ Pipe StorageAzureBlob::read(
|
||||
requested_virtual_columns.push_back(virtual_column);
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageAzureBlobSource::Iterator> iterator_wrapper;
|
||||
std::shared_ptr<StorageAzureBlobSource::IIterator> iterator_wrapper;
|
||||
if (configuration.withGlobs())
|
||||
{
|
||||
/// Iterate through disclosed globs and make a source for each file
|
||||
iterator_wrapper = std::make_shared<StorageAzureBlobSource::Iterator>(
|
||||
object_storage.get(), configuration.container, std::nullopt,
|
||||
configuration.blob_path, query_info.query, virtual_block, local_context, nullptr);
|
||||
iterator_wrapper = std::make_shared<StorageAzureBlobSource::GlobIterator>(
|
||||
object_storage.get(), configuration.container, configuration.blob_path,
|
||||
query_info.query, virtual_block, local_context, nullptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
iterator_wrapper = std::make_shared<StorageAzureBlobSource::Iterator>(
|
||||
iterator_wrapper = std::make_shared<StorageAzureBlobSource::KeysIterator>(
|
||||
object_storage.get(), configuration.container, configuration.blobs_paths,
|
||||
std::nullopt, query_info.query, virtual_block, local_context, nullptr);
|
||||
query_info.query, virtual_block, local_context, nullptr);
|
||||
}
|
||||
|
||||
ColumnsDescription columns_description;
|
||||
@ -799,202 +799,129 @@ static void addPathToVirtualColumns(Block & block, const String & path, size_t i
|
||||
block.getByName("_idx").column->assumeMutableRef().insert(idx);
|
||||
}
|
||||
|
||||
StorageAzureBlobSource::Iterator::Iterator(
|
||||
StorageAzureBlobSource::GlobIterator::GlobIterator(
|
||||
AzureObjectStorage * object_storage_,
|
||||
const std::string & container_,
|
||||
std::optional<Strings> keys_,
|
||||
std::optional<String> blob_path_with_globs_,
|
||||
String blob_path_with_globs_,
|
||||
ASTPtr query_,
|
||||
const Block & virtual_header_,
|
||||
ContextPtr context_,
|
||||
RelativePathsWithMetadata * outer_blobs_)
|
||||
: WithContext(context_)
|
||||
: IIterator(context_)
|
||||
, object_storage(object_storage_)
|
||||
, container(container_)
|
||||
, keys(keys_)
|
||||
, blob_path_with_globs(blob_path_with_globs_)
|
||||
, query(query_)
|
||||
, virtual_header(virtual_header_)
|
||||
, outer_blobs(outer_blobs_)
|
||||
{
|
||||
if (keys.has_value() && blob_path_with_globs.has_value())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot specify keys and glob simultaneously it's a bug");
|
||||
|
||||
if (!keys.has_value() && !blob_path_with_globs.has_value())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Both keys and glob mask are not specified");
|
||||
const String key_prefix = blob_path_with_globs.substr(0, blob_path_with_globs.find_first_of("*?{"));
|
||||
|
||||
if (keys)
|
||||
/// We don't have to list bucket, because there is no asterisks.
|
||||
if (key_prefix.size() == blob_path_with_globs.size())
|
||||
{
|
||||
Strings all_keys = *keys;
|
||||
|
||||
blobs_with_metadata.emplace();
|
||||
/// Create a virtual block with one row to construct filter
|
||||
if (query && virtual_header && !all_keys.empty())
|
||||
{
|
||||
/// Append "idx" column as the filter result
|
||||
virtual_header.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
|
||||
|
||||
auto block = virtual_header.cloneEmpty();
|
||||
addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0);
|
||||
|
||||
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
|
||||
|
||||
if (filter_ast)
|
||||
{
|
||||
block = virtual_header.cloneEmpty();
|
||||
for (size_t i = 0; i < all_keys.size(); ++i)
|
||||
addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i);
|
||||
|
||||
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
|
||||
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
|
||||
|
||||
Strings filtered_keys;
|
||||
filtered_keys.reserve(block.rows());
|
||||
for (UInt64 idx : idxs.getData())
|
||||
filtered_keys.emplace_back(std::move(all_keys[idx]));
|
||||
|
||||
all_keys = std::move(filtered_keys);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto && key : all_keys)
|
||||
{
|
||||
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
|
||||
total_size += object_metadata.size_bytes;
|
||||
blobs_with_metadata->emplace_back(RelativePathWithMetadata{key, object_metadata});
|
||||
if (outer_blobs)
|
||||
outer_blobs->emplace_back(blobs_with_metadata->back());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
const String key_prefix = blob_path_with_globs->substr(0, blob_path_with_globs->find_first_of("*?{"));
|
||||
|
||||
/// We don't have to list bucket, because there is no asterisks.
|
||||
if (key_prefix.size() == blob_path_with_globs->size())
|
||||
{
|
||||
ObjectMetadata object_metadata = object_storage->getObjectMetadata(*blob_path_with_globs);
|
||||
blobs_with_metadata->emplace_back(*blob_path_with_globs, object_metadata);
|
||||
if (outer_blobs)
|
||||
outer_blobs->emplace_back(blobs_with_metadata->back());
|
||||
return;
|
||||
}
|
||||
|
||||
object_storage_iterator = object_storage->iterate(key_prefix);
|
||||
|
||||
matcher = std::make_unique<re2::RE2>(makeRegexpPatternFromGlobs(*blob_path_with_globs));
|
||||
|
||||
if (!matcher->ok())
|
||||
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
|
||||
"Cannot compile regex from glob ({}): {}", *blob_path_with_globs, matcher->error());
|
||||
|
||||
recursive = *blob_path_with_globs == "/**" ? true : false;
|
||||
ObjectMetadata object_metadata = object_storage->getObjectMetadata(blob_path_with_globs);
|
||||
blobs_with_metadata.emplace_back(blob_path_with_globs, object_metadata);
|
||||
if (outer_blobs)
|
||||
outer_blobs->emplace_back(blobs_with_metadata.back());
|
||||
return;
|
||||
}
|
||||
|
||||
object_storage_iterator = object_storage->iterate(key_prefix);
|
||||
|
||||
matcher = std::make_unique<re2::RE2>(makeRegexpPatternFromGlobs(blob_path_with_globs));
|
||||
|
||||
if (!matcher->ok())
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_COMPILE_REGEXP, "Cannot compile regex from glob ({}): {}", blob_path_with_globs, matcher->error());
|
||||
|
||||
recursive = blob_path_with_globs == "/**" ? true : false;
|
||||
}
|
||||
|
||||
RelativePathWithMetadata StorageAzureBlobSource::Iterator::next()
|
||||
RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next()
|
||||
{
|
||||
std::lock_guard lock(next_mutex);
|
||||
|
||||
if (is_finished)
|
||||
return {};
|
||||
|
||||
if (keys)
|
||||
bool need_new_batch = blobs_with_metadata.empty() || index >= blobs_with_metadata.size();
|
||||
|
||||
if (need_new_batch)
|
||||
{
|
||||
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
|
||||
if (current_index >= blobs_with_metadata->size())
|
||||
RelativePathsWithMetadata new_batch;
|
||||
while (new_batch.empty())
|
||||
{
|
||||
is_finished = true;
|
||||
return {};
|
||||
}
|
||||
|
||||
return (*blobs_with_metadata)[current_index];
|
||||
}
|
||||
else
|
||||
{
|
||||
bool need_new_batch = false;
|
||||
{
|
||||
std::lock_guard lock(next_mutex);
|
||||
need_new_batch = !blobs_with_metadata || index >= blobs_with_metadata->size();
|
||||
}
|
||||
|
||||
if (need_new_batch)
|
||||
{
|
||||
RelativePathsWithMetadata new_batch;
|
||||
while (new_batch.empty())
|
||||
auto result = object_storage_iterator->getCurrrentBatchAndScheduleNext();
|
||||
if (result.has_value())
|
||||
{
|
||||
if (object_storage_iterator->isValid())
|
||||
{
|
||||
new_batch = object_storage_iterator->currentBatch();
|
||||
object_storage_iterator->nextBatch();
|
||||
}
|
||||
else
|
||||
{
|
||||
is_finished = true;
|
||||
return {};
|
||||
}
|
||||
|
||||
for (auto it = new_batch.begin(); it != new_batch.end();)
|
||||
{
|
||||
if (!recursive && !re2::RE2::FullMatch(it->relative_path, *matcher))
|
||||
it = new_batch.erase(it);
|
||||
else
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
index.store(0, std::memory_order_relaxed);
|
||||
if (!is_initialized)
|
||||
{
|
||||
createFilterAST(new_batch.front().relative_path);
|
||||
is_initialized = true;
|
||||
}
|
||||
|
||||
if (filter_ast)
|
||||
{
|
||||
auto block = virtual_header.cloneEmpty();
|
||||
for (size_t i = 0; i < new_batch.size(); ++i)
|
||||
addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i);
|
||||
|
||||
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
|
||||
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
|
||||
|
||||
std::lock_guard lock(next_mutex);
|
||||
blob_path_with_globs.reset();
|
||||
blob_path_with_globs.emplace();
|
||||
for (UInt64 idx : idxs.getData())
|
||||
{
|
||||
total_size.fetch_add(new_batch[idx].metadata.size_bytes, std::memory_order_relaxed);
|
||||
blobs_with_metadata->emplace_back(std::move(new_batch[idx]));
|
||||
if (outer_blobs)
|
||||
outer_blobs->emplace_back(blobs_with_metadata->back());
|
||||
}
|
||||
new_batch = result.value();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (outer_blobs)
|
||||
outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end());
|
||||
is_finished = true;
|
||||
return {};
|
||||
}
|
||||
|
||||
std::lock_guard lock(next_mutex);
|
||||
blobs_with_metadata = std::move(new_batch);
|
||||
for (const auto & [_, info] : *blobs_with_metadata)
|
||||
total_size.fetch_add(info.size_bytes, std::memory_order_relaxed);
|
||||
for (auto it = new_batch.begin(); it != new_batch.end();)
|
||||
{
|
||||
if (!recursive && !re2::RE2::FullMatch(it->relative_path, *matcher))
|
||||
it = new_batch.erase(it);
|
||||
else
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
|
||||
index = 0;
|
||||
if (!is_initialized)
|
||||
{
|
||||
createFilterAST(new_batch.front().relative_path);
|
||||
is_initialized = true;
|
||||
}
|
||||
|
||||
std::lock_guard lock(next_mutex);
|
||||
return (*blobs_with_metadata)[current_index];
|
||||
if (filter_ast)
|
||||
{
|
||||
auto block = virtual_header.cloneEmpty();
|
||||
for (size_t i = 0; i < new_batch.size(); ++i)
|
||||
addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i);
|
||||
|
||||
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
|
||||
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
|
||||
|
||||
blobs_with_metadata.clear();
|
||||
for (UInt64 idx : idxs.getData())
|
||||
{
|
||||
total_size.fetch_add(new_batch[idx].metadata.size_bytes, std::memory_order_relaxed);
|
||||
blobs_with_metadata.emplace_back(std::move(new_batch[idx]));
|
||||
if (outer_blobs)
|
||||
outer_blobs->emplace_back(blobs_with_metadata.back());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (outer_blobs)
|
||||
outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end());
|
||||
|
||||
blobs_with_metadata = std::move(new_batch);
|
||||
for (const auto & [_, info] : blobs_with_metadata)
|
||||
total_size.fetch_add(info.size_bytes, std::memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
size_t current_index = index++;
|
||||
if (current_index >= blobs_with_metadata.size())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index out of bound for blob metadata");
|
||||
return blobs_with_metadata[current_index];
|
||||
}
|
||||
|
||||
size_t StorageAzureBlobSource::Iterator::getTotalSize() const
|
||||
size_t StorageAzureBlobSource::GlobIterator::getTotalSize() const
|
||||
{
|
||||
return total_size.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
||||
void StorageAzureBlobSource::Iterator::createFilterAST(const String & any_key)
|
||||
void StorageAzureBlobSource::GlobIterator::createFilterAST(const String & any_key)
|
||||
{
|
||||
if (!query || !virtual_header)
|
||||
return;
|
||||
@ -1009,6 +936,78 @@ void StorageAzureBlobSource::Iterator::createFilterAST(const String & any_key)
|
||||
}
|
||||
|
||||
|
||||
StorageAzureBlobSource::KeysIterator::KeysIterator(
|
||||
AzureObjectStorage * object_storage_,
|
||||
const std::string & container_,
|
||||
Strings keys_,
|
||||
ASTPtr query_,
|
||||
const Block & virtual_header_,
|
||||
ContextPtr context_,
|
||||
RelativePathsWithMetadata * outer_blobs_)
|
||||
: IIterator(context_)
|
||||
, object_storage(object_storage_)
|
||||
, container(container_)
|
||||
, query(query_)
|
||||
, virtual_header(virtual_header_)
|
||||
, outer_blobs(outer_blobs_)
|
||||
{
|
||||
Strings all_keys = keys_;
|
||||
|
||||
/// Create a virtual block with one row to construct filter
|
||||
if (query && virtual_header && !all_keys.empty())
|
||||
{
|
||||
/// Append "idx" column as the filter result
|
||||
virtual_header.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
|
||||
|
||||
auto block = virtual_header.cloneEmpty();
|
||||
addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0);
|
||||
|
||||
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
|
||||
|
||||
if (filter_ast)
|
||||
{
|
||||
block = virtual_header.cloneEmpty();
|
||||
for (size_t i = 0; i < all_keys.size(); ++i)
|
||||
addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i);
|
||||
|
||||
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
|
||||
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
|
||||
|
||||
Strings filtered_keys;
|
||||
filtered_keys.reserve(block.rows());
|
||||
for (UInt64 idx : idxs.getData())
|
||||
filtered_keys.emplace_back(std::move(all_keys[idx]));
|
||||
|
||||
all_keys = std::move(filtered_keys);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto && key : all_keys)
|
||||
{
|
||||
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
|
||||
total_size += object_metadata.size_bytes;
|
||||
keys.emplace_back(RelativePathWithMetadata{key, object_metadata});
|
||||
}
|
||||
|
||||
if (outer_blobs)
|
||||
*outer_blobs = keys;
|
||||
}
|
||||
|
||||
RelativePathWithMetadata StorageAzureBlobSource::KeysIterator::next()
|
||||
{
|
||||
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
|
||||
if (current_index >= keys.size())
|
||||
return {};
|
||||
|
||||
return keys[current_index];
|
||||
}
|
||||
|
||||
size_t StorageAzureBlobSource::KeysIterator::getTotalSize() const
|
||||
{
|
||||
return total_size.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
||||
Chunk StorageAzureBlobSource::generate()
|
||||
{
|
||||
while (true)
|
||||
@ -1095,7 +1094,7 @@ StorageAzureBlobSource::StorageAzureBlobSource(
|
||||
String compression_hint_,
|
||||
AzureObjectStorage * object_storage_,
|
||||
const String & container_,
|
||||
std::shared_ptr<Iterator> file_iterator_)
|
||||
std::shared_ptr<IIterator> file_iterator_)
|
||||
:ISource(getHeader(sample_block_, requested_virtual_columns_))
|
||||
, WithContext(context_)
|
||||
, requested_virtual_columns(requested_virtual_columns_)
|
||||
@ -1196,18 +1195,16 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
|
||||
ContextPtr ctx)
|
||||
{
|
||||
RelativePathsWithMetadata read_keys;
|
||||
std::shared_ptr<StorageAzureBlobSource::Iterator> file_iterator;
|
||||
std::shared_ptr<StorageAzureBlobSource::IIterator> file_iterator;
|
||||
if (configuration.withGlobs())
|
||||
{
|
||||
file_iterator = std::make_shared<StorageAzureBlobSource::Iterator>(
|
||||
object_storage, configuration.container, std::nullopt,
|
||||
configuration.blob_path, nullptr, Block{}, ctx, &read_keys);
|
||||
file_iterator = std::make_shared<StorageAzureBlobSource::GlobIterator>(
|
||||
object_storage, configuration.container, configuration.blob_path, nullptr, Block{}, ctx, &read_keys);
|
||||
}
|
||||
else
|
||||
{
|
||||
file_iterator = std::make_shared<StorageAzureBlobSource::Iterator>(
|
||||
object_storage, configuration.container, configuration.blobs_paths,
|
||||
std::nullopt, nullptr, Block{}, ctx, &read_keys);
|
||||
file_iterator = std::make_shared<StorageAzureBlobSource::KeysIterator>(
|
||||
object_storage, configuration.container, configuration.blobs_paths, nullptr, Block{}, ctx, &read_keys);
|
||||
}
|
||||
|
||||
std::optional<ColumnsDescription> columns_from_cache;
|
||||
|
@ -142,36 +142,45 @@ private:
|
||||
class StorageAzureBlobSource : public ISource, WithContext
|
||||
{
|
||||
public:
|
||||
class Iterator : WithContext
|
||||
class IIterator : public WithContext
|
||||
{
|
||||
public:
|
||||
Iterator(
|
||||
IIterator(ContextPtr context_):WithContext(context_) {}
|
||||
virtual ~IIterator() = default;
|
||||
virtual RelativePathWithMetadata next() = 0;
|
||||
virtual size_t getTotalSize() const = 0;
|
||||
|
||||
RelativePathWithMetadata operator ()() { return next(); }
|
||||
};
|
||||
|
||||
class GlobIterator : public IIterator
|
||||
{
|
||||
public:
|
||||
GlobIterator(
|
||||
AzureObjectStorage * object_storage_,
|
||||
const std::string & container_,
|
||||
std::optional<Strings> keys_,
|
||||
std::optional<String> blob_path_with_globs_,
|
||||
String blob_path_with_globs_,
|
||||
ASTPtr query_,
|
||||
const Block & virtual_header_,
|
||||
ContextPtr context_,
|
||||
RelativePathsWithMetadata * outer_blobs_);
|
||||
|
||||
RelativePathWithMetadata next();
|
||||
size_t getTotalSize() const;
|
||||
~Iterator() = default;
|
||||
RelativePathWithMetadata next() override;
|
||||
size_t getTotalSize() const override;
|
||||
~GlobIterator() override = default;
|
||||
|
||||
private:
|
||||
AzureObjectStorage * object_storage;
|
||||
std::string container;
|
||||
std::optional<Strings> keys;
|
||||
std::optional<String> blob_path_with_globs;
|
||||
String blob_path_with_globs;
|
||||
ASTPtr query;
|
||||
ASTPtr filter_ast;
|
||||
Block virtual_header;
|
||||
|
||||
std::atomic<size_t> index = 0;
|
||||
size_t index = 0;
|
||||
std::atomic<size_t> total_size = 0;
|
||||
|
||||
std::optional<RelativePathsWithMetadata> blobs_with_metadata;
|
||||
RelativePathsWithMetadata blobs_with_metadata;
|
||||
RelativePathsWithMetadata * outer_blobs;
|
||||
ObjectStorageIteratorPtr object_storage_iterator;
|
||||
bool recursive{false};
|
||||
@ -179,11 +188,42 @@ public:
|
||||
std::unique_ptr<re2::RE2> matcher;
|
||||
|
||||
void createFilterAST(const String & any_key);
|
||||
std::atomic<bool> is_finished = false;
|
||||
std::atomic<bool> is_initialized = false;
|
||||
bool is_finished = false;
|
||||
bool is_initialized = false;
|
||||
std::mutex next_mutex;
|
||||
};
|
||||
|
||||
class KeysIterator : public IIterator
|
||||
{
|
||||
public:
|
||||
KeysIterator(
|
||||
AzureObjectStorage * object_storage_,
|
||||
const std::string & container_,
|
||||
Strings keys_,
|
||||
ASTPtr query_,
|
||||
const Block & virtual_header_,
|
||||
ContextPtr context_,
|
||||
RelativePathsWithMetadata * outer_blobs_);
|
||||
|
||||
RelativePathWithMetadata next() override;
|
||||
size_t getTotalSize() const override;
|
||||
~KeysIterator() override = default;
|
||||
|
||||
private:
|
||||
AzureObjectStorage * object_storage;
|
||||
std::string container;
|
||||
RelativePathsWithMetadata keys;
|
||||
|
||||
ASTPtr query;
|
||||
ASTPtr filter_ast;
|
||||
Block virtual_header;
|
||||
|
||||
std::atomic<size_t> index = 0;
|
||||
std::atomic<size_t> total_size = 0;
|
||||
|
||||
RelativePathsWithMetadata * outer_blobs;
|
||||
};
|
||||
|
||||
StorageAzureBlobSource(
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
const String & format_,
|
||||
@ -196,7 +236,7 @@ public:
|
||||
String compression_hint_,
|
||||
AzureObjectStorage * object_storage_,
|
||||
const String & container_,
|
||||
std::shared_ptr<Iterator> file_iterator_);
|
||||
std::shared_ptr<IIterator> file_iterator_);
|
||||
|
||||
~StorageAzureBlobSource() override;
|
||||
|
||||
@ -217,7 +257,7 @@ private:
|
||||
String compression_hint;
|
||||
AzureObjectStorage * object_storage;
|
||||
String container;
|
||||
std::shared_ptr<Iterator> file_iterator;
|
||||
std::shared_ptr<IIterator> file_iterator;
|
||||
|
||||
struct ReaderHolder
|
||||
{
|
||||
|
@ -300,10 +300,10 @@ def test_put_get_with_globs(cluster):
|
||||
|
||||
azure_query(
|
||||
node,
|
||||
f"CREATE TABLE test_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
|
||||
f"CREATE TABLE test_put_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
|
||||
)
|
||||
|
||||
query = f"insert into test_{i}_{j} VALUES {values}"
|
||||
query = f"insert into test_put_{i}_{j} VALUES {values}"
|
||||
azure_query(node, query)
|
||||
|
||||
azure_query(
|
||||
@ -332,9 +332,11 @@ def test_azure_glob_scheherazade(cluster):
|
||||
unique_num = random.randint(1, 10000)
|
||||
azure_query(
|
||||
node,
|
||||
f"CREATE TABLE test_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
|
||||
f"CREATE TABLE test_scheherazade_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
|
||||
)
|
||||
query = (
|
||||
f"insert into test_scheherazade_{i}_{unique_num} VALUES {values}"
|
||||
)
|
||||
query = f"insert into test_{i}_{unique_num} VALUES {values}"
|
||||
azure_query(node, query)
|
||||
|
||||
jobs.append(
|
||||
@ -558,6 +560,7 @@ def test_schema_inference_from_globs_tf(cluster):
|
||||
node = cluster.instances["node"] # type: ClickHouseInstance
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
max_path = ""
|
||||
|
||||
for i in range(10):
|
||||
for j in range(10):
|
||||
path = "{}/{}_{}/{}.csv".format(
|
||||
@ -582,13 +585,29 @@ def test_partition_by_tf(cluster):
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
partition_by = "column3"
|
||||
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
|
||||
filename = "test_tf_{_partition_id}.csv"
|
||||
filename = "test_partition_tf_{_partition_id}.csv"
|
||||
|
||||
azure_query(
|
||||
node,
|
||||
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
|
||||
)
|
||||
|
||||
assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv")
|
||||
assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv")
|
||||
assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv")
|
||||
assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv")
|
||||
assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv")
|
||||
assert "78,43,45\n" == get_azure_file_content("test_partition_tf_45.csv")
|
||||
|
||||
|
||||
def test_filter_using_file(cluster):
|
||||
node = cluster.instances["node"]
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
partition_by = "column3"
|
||||
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
|
||||
filename = "test_partition_tf_{_partition_id}.csv"
|
||||
|
||||
azure_query(
|
||||
node,
|
||||
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
|
||||
)
|
||||
|
||||
query = f"select count(*) from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_partition_tf_*.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') WHERE _file='test_partition_tf_3.csv'"
|
||||
assert azure_query(node, query) == "1\n"
|
||||
|
@ -2169,6 +2169,7 @@ snowflakeToDateTime
|
||||
socketcache
|
||||
soundex
|
||||
sparkbar
|
||||
sparkBar
|
||||
sparsehash
|
||||
speedscope
|
||||
splitByChar
|
||||
|
Loading…
Reference in New Issue
Block a user