Merge pull request #50936 from ClickHouse/Fix_race_azure_blob_storage_iterator

Fix race azure blob storage iterator
This commit is contained in:
SmitaRKulkarni 2023-06-27 09:27:45 +02:00 committed by GitHub
commit dcf581a985
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 270 additions and 191 deletions

View File

@ -14,6 +14,7 @@ public:
virtual bool isValid() = 0;
virtual RelativePathWithMetadata current() = 0;
virtual RelativePathsWithMetadata currentBatch() = 0;
virtual std::optional<RelativePathsWithMetadata> getCurrrentBatchAndScheduleNext() = 0;
virtual size_t getAccumulatedSize() const = 0;
virtual ~IObjectStorageIterator() = default;
@ -53,6 +54,11 @@ public:
return batch;
}
virtual std::optional<RelativePathsWithMetadata> getCurrrentBatchAndScheduleNext() override
{
return std::nullopt;
}
size_t getAccumulatedSize() const override
{
return batch.size();

View File

@ -100,6 +100,22 @@ RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch()
return current_batch;
}
std::optional<RelativePathsWithMetadata> IObjectStorageIteratorAsync::getCurrrentBatchAndScheduleNext()
{
std::lock_guard lock(mutex);
if (!is_initialized)
nextBatch();
if (current_batch_iterator != current_batch.end())
{
auto temp_current_batch = current_batch;
nextBatch();
return temp_current_batch;
}
return std::nullopt;
}
size_t IObjectStorageIteratorAsync::getAccumulatedSize() const
{
return accumulated_size.load(std::memory_order_relaxed);

View File

@ -27,6 +27,7 @@ public:
RelativePathWithMetadata current() override;
RelativePathsWithMetadata currentBatch() override;
size_t getAccumulatedSize() const override;
std::optional<RelativePathsWithMetadata> getCurrrentBatchAndScheduleNext() override;
~IObjectStorageIteratorAsync() override
{
@ -48,7 +49,7 @@ protected:
bool is_initialized{false};
bool is_finished{false};
mutable std::mutex mutex;
mutable std::recursive_mutex mutex;
ThreadPool list_objects_pool;
ThreadPoolCallbackRunner<BatchAndHasNext> list_objects_scheduler;
std::future<BatchAndHasNext> outcome_future;

View File

@ -624,19 +624,19 @@ Pipe StorageAzureBlob::read(
requested_virtual_columns.push_back(virtual_column);
}
std::shared_ptr<StorageAzureBlobSource::Iterator> iterator_wrapper;
std::shared_ptr<StorageAzureBlobSource::IIterator> iterator_wrapper;
if (configuration.withGlobs())
{
/// Iterate through disclosed globs and make a source for each file
iterator_wrapper = std::make_shared<StorageAzureBlobSource::Iterator>(
object_storage.get(), configuration.container, std::nullopt,
configuration.blob_path, query_info.query, virtual_block, local_context, nullptr);
iterator_wrapper = std::make_shared<StorageAzureBlobSource::GlobIterator>(
object_storage.get(), configuration.container, configuration.blob_path,
query_info.query, virtual_block, local_context, nullptr);
}
else
{
iterator_wrapper = std::make_shared<StorageAzureBlobSource::Iterator>(
iterator_wrapper = std::make_shared<StorageAzureBlobSource::KeysIterator>(
object_storage.get(), configuration.container, configuration.blobs_paths,
std::nullopt, query_info.query, virtual_block, local_context, nullptr);
query_info.query, virtual_block, local_context, nullptr);
}
ColumnsDescription columns_description;
@ -799,202 +799,129 @@ static void addPathToVirtualColumns(Block & block, const String & path, size_t i
block.getByName("_idx").column->assumeMutableRef().insert(idx);
}
StorageAzureBlobSource::Iterator::Iterator(
StorageAzureBlobSource::GlobIterator::GlobIterator(
AzureObjectStorage * object_storage_,
const std::string & container_,
std::optional<Strings> keys_,
std::optional<String> blob_path_with_globs_,
String blob_path_with_globs_,
ASTPtr query_,
const Block & virtual_header_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs_)
: WithContext(context_)
: IIterator(context_)
, object_storage(object_storage_)
, container(container_)
, keys(keys_)
, blob_path_with_globs(blob_path_with_globs_)
, query(query_)
, virtual_header(virtual_header_)
, outer_blobs(outer_blobs_)
{
if (keys.has_value() && blob_path_with_globs.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot specify keys and glob simultaneously it's a bug");
if (!keys.has_value() && !blob_path_with_globs.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Both keys and glob mask are not specified");
const String key_prefix = blob_path_with_globs.substr(0, blob_path_with_globs.find_first_of("*?{"));
if (keys)
/// We don't have to list bucket, because there is no asterisks.
if (key_prefix.size() == blob_path_with_globs.size())
{
Strings all_keys = *keys;
blobs_with_metadata.emplace();
/// Create a virtual block with one row to construct filter
if (query && virtual_header && !all_keys.empty())
{
/// Append "idx" column as the filter result
virtual_header.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
auto block = virtual_header.cloneEmpty();
addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0);
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
if (filter_ast)
{
block = virtual_header.cloneEmpty();
for (size_t i = 0; i < all_keys.size(); ++i)
addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i);
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
Strings filtered_keys;
filtered_keys.reserve(block.rows());
for (UInt64 idx : idxs.getData())
filtered_keys.emplace_back(std::move(all_keys[idx]));
all_keys = std::move(filtered_keys);
}
}
for (auto && key : all_keys)
{
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
total_size += object_metadata.size_bytes;
blobs_with_metadata->emplace_back(RelativePathWithMetadata{key, object_metadata});
if (outer_blobs)
outer_blobs->emplace_back(blobs_with_metadata->back());
}
}
else
{
const String key_prefix = blob_path_with_globs->substr(0, blob_path_with_globs->find_first_of("*?{"));
/// We don't have to list bucket, because there is no asterisks.
if (key_prefix.size() == blob_path_with_globs->size())
{
ObjectMetadata object_metadata = object_storage->getObjectMetadata(*blob_path_with_globs);
blobs_with_metadata->emplace_back(*blob_path_with_globs, object_metadata);
if (outer_blobs)
outer_blobs->emplace_back(blobs_with_metadata->back());
return;
}
object_storage_iterator = object_storage->iterate(key_prefix);
matcher = std::make_unique<re2::RE2>(makeRegexpPatternFromGlobs(*blob_path_with_globs));
if (!matcher->ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", *blob_path_with_globs, matcher->error());
recursive = *blob_path_with_globs == "/**" ? true : false;
ObjectMetadata object_metadata = object_storage->getObjectMetadata(blob_path_with_globs);
blobs_with_metadata.emplace_back(blob_path_with_globs, object_metadata);
if (outer_blobs)
outer_blobs->emplace_back(blobs_with_metadata.back());
return;
}
object_storage_iterator = object_storage->iterate(key_prefix);
matcher = std::make_unique<re2::RE2>(makeRegexpPatternFromGlobs(blob_path_with_globs));
if (!matcher->ok())
throw Exception(
ErrorCodes::CANNOT_COMPILE_REGEXP, "Cannot compile regex from glob ({}): {}", blob_path_with_globs, matcher->error());
recursive = blob_path_with_globs == "/**" ? true : false;
}
RelativePathWithMetadata StorageAzureBlobSource::Iterator::next()
RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next()
{
std::lock_guard lock(next_mutex);
if (is_finished)
return {};
if (keys)
bool need_new_batch = blobs_with_metadata.empty() || index >= blobs_with_metadata.size();
if (need_new_batch)
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= blobs_with_metadata->size())
RelativePathsWithMetadata new_batch;
while (new_batch.empty())
{
is_finished = true;
return {};
}
return (*blobs_with_metadata)[current_index];
}
else
{
bool need_new_batch = false;
{
std::lock_guard lock(next_mutex);
need_new_batch = !blobs_with_metadata || index >= blobs_with_metadata->size();
}
if (need_new_batch)
{
RelativePathsWithMetadata new_batch;
while (new_batch.empty())
auto result = object_storage_iterator->getCurrrentBatchAndScheduleNext();
if (result.has_value())
{
if (object_storage_iterator->isValid())
{
new_batch = object_storage_iterator->currentBatch();
object_storage_iterator->nextBatch();
}
else
{
is_finished = true;
return {};
}
for (auto it = new_batch.begin(); it != new_batch.end();)
{
if (!recursive && !re2::RE2::FullMatch(it->relative_path, *matcher))
it = new_batch.erase(it);
else
++it;
}
}
index.store(0, std::memory_order_relaxed);
if (!is_initialized)
{
createFilterAST(new_batch.front().relative_path);
is_initialized = true;
}
if (filter_ast)
{
auto block = virtual_header.cloneEmpty();
for (size_t i = 0; i < new_batch.size(); ++i)
addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i);
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
std::lock_guard lock(next_mutex);
blob_path_with_globs.reset();
blob_path_with_globs.emplace();
for (UInt64 idx : idxs.getData())
{
total_size.fetch_add(new_batch[idx].metadata.size_bytes, std::memory_order_relaxed);
blobs_with_metadata->emplace_back(std::move(new_batch[idx]));
if (outer_blobs)
outer_blobs->emplace_back(blobs_with_metadata->back());
}
new_batch = result.value();
}
else
{
if (outer_blobs)
outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end());
is_finished = true;
return {};
}
std::lock_guard lock(next_mutex);
blobs_with_metadata = std::move(new_batch);
for (const auto & [_, info] : *blobs_with_metadata)
total_size.fetch_add(info.size_bytes, std::memory_order_relaxed);
for (auto it = new_batch.begin(); it != new_batch.end();)
{
if (!recursive && !re2::RE2::FullMatch(it->relative_path, *matcher))
it = new_batch.erase(it);
else
++it;
}
}
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
index = 0;
if (!is_initialized)
{
createFilterAST(new_batch.front().relative_path);
is_initialized = true;
}
std::lock_guard lock(next_mutex);
return (*blobs_with_metadata)[current_index];
if (filter_ast)
{
auto block = virtual_header.cloneEmpty();
for (size_t i = 0; i < new_batch.size(); ++i)
addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i);
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
blobs_with_metadata.clear();
for (UInt64 idx : idxs.getData())
{
total_size.fetch_add(new_batch[idx].metadata.size_bytes, std::memory_order_relaxed);
blobs_with_metadata.emplace_back(std::move(new_batch[idx]));
if (outer_blobs)
outer_blobs->emplace_back(blobs_with_metadata.back());
}
}
else
{
if (outer_blobs)
outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end());
blobs_with_metadata = std::move(new_batch);
for (const auto & [_, info] : blobs_with_metadata)
total_size.fetch_add(info.size_bytes, std::memory_order_relaxed);
}
}
size_t current_index = index++;
if (current_index >= blobs_with_metadata.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index out of bound for blob metadata");
return blobs_with_metadata[current_index];
}
size_t StorageAzureBlobSource::Iterator::getTotalSize() const
size_t StorageAzureBlobSource::GlobIterator::getTotalSize() const
{
return total_size.load(std::memory_order_relaxed);
}
void StorageAzureBlobSource::Iterator::createFilterAST(const String & any_key)
void StorageAzureBlobSource::GlobIterator::createFilterAST(const String & any_key)
{
if (!query || !virtual_header)
return;
@ -1009,6 +936,78 @@ void StorageAzureBlobSource::Iterator::createFilterAST(const String & any_key)
}
StorageAzureBlobSource::KeysIterator::KeysIterator(
AzureObjectStorage * object_storage_,
const std::string & container_,
Strings keys_,
ASTPtr query_,
const Block & virtual_header_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs_)
: IIterator(context_)
, object_storage(object_storage_)
, container(container_)
, query(query_)
, virtual_header(virtual_header_)
, outer_blobs(outer_blobs_)
{
Strings all_keys = keys_;
/// Create a virtual block with one row to construct filter
if (query && virtual_header && !all_keys.empty())
{
/// Append "idx" column as the filter result
virtual_header.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
auto block = virtual_header.cloneEmpty();
addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0);
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
if (filter_ast)
{
block = virtual_header.cloneEmpty();
for (size_t i = 0; i < all_keys.size(); ++i)
addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i);
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
Strings filtered_keys;
filtered_keys.reserve(block.rows());
for (UInt64 idx : idxs.getData())
filtered_keys.emplace_back(std::move(all_keys[idx]));
all_keys = std::move(filtered_keys);
}
}
for (auto && key : all_keys)
{
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
total_size += object_metadata.size_bytes;
keys.emplace_back(RelativePathWithMetadata{key, object_metadata});
}
if (outer_blobs)
*outer_blobs = keys;
}
RelativePathWithMetadata StorageAzureBlobSource::KeysIterator::next()
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= keys.size())
return {};
return keys[current_index];
}
size_t StorageAzureBlobSource::KeysIterator::getTotalSize() const
{
return total_size.load(std::memory_order_relaxed);
}
Chunk StorageAzureBlobSource::generate()
{
while (true)
@ -1095,7 +1094,7 @@ StorageAzureBlobSource::StorageAzureBlobSource(
String compression_hint_,
AzureObjectStorage * object_storage_,
const String & container_,
std::shared_ptr<Iterator> file_iterator_)
std::shared_ptr<IIterator> file_iterator_)
:ISource(getHeader(sample_block_, requested_virtual_columns_))
, WithContext(context_)
, requested_virtual_columns(requested_virtual_columns_)
@ -1196,18 +1195,16 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
ContextPtr ctx)
{
RelativePathsWithMetadata read_keys;
std::shared_ptr<StorageAzureBlobSource::Iterator> file_iterator;
std::shared_ptr<StorageAzureBlobSource::IIterator> file_iterator;
if (configuration.withGlobs())
{
file_iterator = std::make_shared<StorageAzureBlobSource::Iterator>(
object_storage, configuration.container, std::nullopt,
configuration.blob_path, nullptr, Block{}, ctx, &read_keys);
file_iterator = std::make_shared<StorageAzureBlobSource::GlobIterator>(
object_storage, configuration.container, configuration.blob_path, nullptr, Block{}, ctx, &read_keys);
}
else
{
file_iterator = std::make_shared<StorageAzureBlobSource::Iterator>(
object_storage, configuration.container, configuration.blobs_paths,
std::nullopt, nullptr, Block{}, ctx, &read_keys);
file_iterator = std::make_shared<StorageAzureBlobSource::KeysIterator>(
object_storage, configuration.container, configuration.blobs_paths, nullptr, Block{}, ctx, &read_keys);
}
std::optional<ColumnsDescription> columns_from_cache;

View File

@ -142,36 +142,45 @@ private:
class StorageAzureBlobSource : public ISource, WithContext
{
public:
class Iterator : WithContext
class IIterator : public WithContext
{
public:
Iterator(
IIterator(ContextPtr context_):WithContext(context_) {}
virtual ~IIterator() = default;
virtual RelativePathWithMetadata next() = 0;
virtual size_t getTotalSize() const = 0;
RelativePathWithMetadata operator ()() { return next(); }
};
class GlobIterator : public IIterator
{
public:
GlobIterator(
AzureObjectStorage * object_storage_,
const std::string & container_,
std::optional<Strings> keys_,
std::optional<String> blob_path_with_globs_,
String blob_path_with_globs_,
ASTPtr query_,
const Block & virtual_header_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs_);
RelativePathWithMetadata next();
size_t getTotalSize() const;
~Iterator() = default;
RelativePathWithMetadata next() override;
size_t getTotalSize() const override;
~GlobIterator() override = default;
private:
AzureObjectStorage * object_storage;
std::string container;
std::optional<Strings> keys;
std::optional<String> blob_path_with_globs;
String blob_path_with_globs;
ASTPtr query;
ASTPtr filter_ast;
Block virtual_header;
std::atomic<size_t> index = 0;
size_t index = 0;
std::atomic<size_t> total_size = 0;
std::optional<RelativePathsWithMetadata> blobs_with_metadata;
RelativePathsWithMetadata blobs_with_metadata;
RelativePathsWithMetadata * outer_blobs;
ObjectStorageIteratorPtr object_storage_iterator;
bool recursive{false};
@ -179,11 +188,42 @@ public:
std::unique_ptr<re2::RE2> matcher;
void createFilterAST(const String & any_key);
std::atomic<bool> is_finished = false;
std::atomic<bool> is_initialized = false;
bool is_finished = false;
bool is_initialized = false;
std::mutex next_mutex;
};
class KeysIterator : public IIterator
{
public:
KeysIterator(
AzureObjectStorage * object_storage_,
const std::string & container_,
Strings keys_,
ASTPtr query_,
const Block & virtual_header_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs_);
RelativePathWithMetadata next() override;
size_t getTotalSize() const override;
~KeysIterator() override = default;
private:
AzureObjectStorage * object_storage;
std::string container;
RelativePathsWithMetadata keys;
ASTPtr query;
ASTPtr filter_ast;
Block virtual_header;
std::atomic<size_t> index = 0;
std::atomic<size_t> total_size = 0;
RelativePathsWithMetadata * outer_blobs;
};
StorageAzureBlobSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const String & format_,
@ -196,7 +236,7 @@ public:
String compression_hint_,
AzureObjectStorage * object_storage_,
const String & container_,
std::shared_ptr<Iterator> file_iterator_);
std::shared_ptr<IIterator> file_iterator_);
~StorageAzureBlobSource() override;
@ -217,7 +257,7 @@ private:
String compression_hint;
AzureObjectStorage * object_storage;
String container;
std::shared_ptr<Iterator> file_iterator;
std::shared_ptr<IIterator> file_iterator;
struct ReaderHolder
{

View File

@ -300,10 +300,10 @@ def test_put_get_with_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
f"CREATE TABLE test_put_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
)
query = f"insert into test_{i}_{j} VALUES {values}"
query = f"insert into test_put_{i}_{j} VALUES {values}"
azure_query(node, query)
azure_query(
@ -332,9 +332,11 @@ def test_azure_glob_scheherazade(cluster):
unique_num = random.randint(1, 10000)
azure_query(
node,
f"CREATE TABLE test_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
f"CREATE TABLE test_scheherazade_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
)
query = (
f"insert into test_scheherazade_{i}_{unique_num} VALUES {values}"
)
query = f"insert into test_{i}_{unique_num} VALUES {values}"
azure_query(node, query)
jobs.append(
@ -558,6 +560,7 @@ def test_schema_inference_from_globs_tf(cluster):
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
max_path = ""
for i in range(10):
for j in range(10):
path = "{}/{}_{}/{}.csv".format(
@ -582,13 +585,29 @@ def test_partition_by_tf(cluster):
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_tf_{_partition_id}.csv"
filename = "test_partition_tf_{_partition_id}.csv"
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
)
assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv")
assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv")
assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv")
assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_partition_tf_45.csv")
def test_filter_using_file(cluster):
node = cluster.instances["node"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_partition_tf_{_partition_id}.csv"
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
)
query = f"select count(*) from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_partition_tf_*.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') WHERE _file='test_partition_tf_3.csv'"
assert azure_query(node, query) == "1\n"