Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHouse into azure_table_function

This commit is contained in:
alesapin 2023-06-06 16:58:05 +02:00
commit 14470b46c7
4 changed files with 66 additions and 49 deletions

View File

@ -40,6 +40,9 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/Pipe.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
using namespace Azure::Storage::Blobs;
@ -155,11 +158,6 @@ StorageAzure::Configuration StorageAzure::getConfiguration(ASTs & engine_args, C
configuration.container = checkAndGetLiteralArgument<String>(engine_args[1], "container");
configuration.blob_path = checkAndGetLiteralArgument<String>(engine_args[2], "blobpath");
LOG_INFO(&Poco::Logger::get("StorageAzure"), "connection_url = {}", configuration.connection_url);
LOG_INFO(&Poco::Logger::get("StorageAzure"), "container = {}", configuration.container);
LOG_INFO(&Poco::Logger::get("StorageAzure"), "blobpath = {}", configuration.blob_path);
auto is_format_arg = [] (const std::string & s) -> bool
{
return s == "auto" || FormatFactory::instance().getAllFormats().contains(s);
@ -231,8 +229,6 @@ StorageAzure::Configuration StorageAzure::getConfiguration(ASTs & engine_args, C
configuration.blobs_paths = {configuration.blob_path};
LOG_INFO(&Poco::Logger::get("StorageAzure"), "get_format_from_file = {}", get_format_from_file);
if (configuration.format == "auto" && get_format_from_file)
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true);
@ -311,7 +307,6 @@ AzureClientPtr StorageAzure::createClient(StorageAzure::Configuration configurat
if (configuration.is_connection_string)
{
LOG_INFO(&Poco::Logger::get("StorageAzure"), "createClient is_connection_string ");
result = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(configuration.connection_url, configuration.container));
result->CreateIfNotExists();
}
@ -418,18 +413,6 @@ StorageAzure::StorageAzure(
for (const auto & key : configuration.blobs_paths)
objects.emplace_back(key);
for (auto obj : objects)
{
LOG_INFO(&Poco::Logger::get("StorageAzure"), "constructor obj.remote_paths = {}", obj.remote_path);
if (object_storage->exists(obj))
{
LOG_INFO(&Poco::Logger::get("StorageAzure"), "constructor exists obj.remote_paths = {}", obj.remote_path);
// auto read_buffer = object_storage->readObject(obj);
// LOG_INFO(&Poco::Logger::get("StorageAzure"), "constructor read size obj.remote_paths = {} , size = {}", obj.remote_path, read_buffer->getFileSize());
}
}
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
@ -1151,17 +1134,17 @@ std::unique_ptr<ReadBuffer> StorageAzureSource::createAzureReadBuffer(const Stri
{
auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size);
read_settings.enable_filesystem_cache = false;
//auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
//const bool object_too_small = object_size <= 2 * download_buffer_size;
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
const bool object_too_small = object_size <= 2 * download_buffer_size;
///// Create a read buffer that will prefetch the first ~1 MB of the file.
///// When reading lots of tiny files, this prefetching almost doubles the throughput.
///// For bigger files, parallel reading is more useful.
//if (object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
//{
// LOG_TRACE(log, "Downloading object {} of size {} from S3 with initial prefetch", key, object_size);
// return object_storage->readObjects({StoredObject(key)}, read_settings, {}, object_size);
//}
// Create a read buffer that will prefetch the first ~1 MB of the file.
// When reading lots of tiny files, this prefetching almost doubles the throughput.
// For bigger files, parallel reading is more useful.
if (object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
LOG_TRACE(log, "Downloading object of size {} from Azure with initial prefetch", object_size);
return createAsyncAzureReadBuffer(key, read_settings, object_size);
}
return object_storage->readObject(StoredObject(key), read_settings, {}, object_size);
}
@ -1283,6 +1266,20 @@ SchemaCache & StorageAzure::getSchemaCache(const ContextPtr & ctx)
}
std::unique_ptr<ReadBuffer> StorageAzureSource::createAsyncAzureReadBuffer(
const String & key, const ReadSettings & read_settings, size_t object_size)
{
auto modified_settings{read_settings};
modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size;
auto async_reader = object_storage->readObjects(StoredObjects{StoredObject{key, object_size}}, modified_settings);
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)
async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY);
return async_reader;
}
}
#endif

View File

@ -282,6 +282,8 @@ private:
std::future<ReaderHolder> createReaderAsync();
std::unique_ptr<ReadBuffer> createAzureReadBuffer(const String & key, size_t object_size);
std::unique_ptr<ReadBuffer> createAsyncAzureReadBuffer(
const String & key, const ReadSettings & read_settings, size_t object_size);
};
}

View File

@ -294,8 +294,6 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context) const
{
LOG_INFO(&Poco::Logger::get("TableFunctionS3"), "getActualTableStructure configuration.structure = {} ",configuration.structure);
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
@ -321,8 +319,6 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
else if (!structure_hint.empty())
columns = structure_hint;
LOG_INFO(&Poco::Logger::get("TableFunctionS3"), "executeImpl structre = {} structure_hint = {} ",configuration.structure, structure_hint.getAll().toString());
StoragePtr storage = std::make_shared<StorageS3>(
configuration,

View File

@ -62,6 +62,7 @@ def get_azure_file_content(filename):
download_stream = blob_client.download_blob()
return download_stream.readall().decode("utf-8")
def put_azure_file_content(filename, data):
container_name = "cont"
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
@ -76,7 +77,6 @@ def put_azure_file_content(filename, data):
blob_client.upload_blob(buf)
def test_create_table_connection_string(cluster):
node = cluster.instances["node"]
azure_query(
@ -243,33 +243,43 @@ def test_create_new_files_on_insert(cluster):
azure_query(node, f"drop table test_multiple_inserts")
def test_overwrite(cluster):
def test_overwrite(cluster):
node = cluster.instances["node"]
azure_query(node, f"create table test_overwrite(a Int32, b String) ENGINE = Azure(azure_conf2, container='cont', blob_path='test_parquet_overwrite', format='Parquet')")
azure_query(
node,
f"create table test_overwrite(a Int32, b String) ENGINE = Azure(azure_conf2, container='cont', blob_path='test_parquet_overwrite', format='Parquet')",
)
azure_query(node, "truncate table test_overwrite")
azure_query(node,
f"insert into test_overwrite select number, randomString(100) from numbers(50) settings azure_truncate_on_insert=1"
azure_query(
node,
f"insert into test_overwrite select number, randomString(100) from numbers(50) settings azure_truncate_on_insert=1",
)
node.query_and_get_error(
f"insert into test_overwrite select number, randomString(100) from numbers(100)"
)
azure_query(node,
f"insert into test_overwrite select number, randomString(100) from numbers(200) settings azure_truncate_on_insert=1"
azure_query(
node,
f"insert into test_overwrite select number, randomString(100) from numbers(200) settings azure_truncate_on_insert=1",
)
result = azure_query(node, f"select count() from test_overwrite")
assert int(result) == 200
def test_insert_with_path_with_globs(cluster):
node = cluster.instances["node"]
azure_query(node, f"create table test_insert_globs(a Int32, b String) ENGINE = Azure(azure_conf2, container='cont', blob_path='test_insert_with_globs*', format='Parquet')")
azure_query(
node,
f"create table test_insert_globs(a Int32, b String) ENGINE = Azure(azure_conf2, container='cont', blob_path='test_insert_with_globs*', format='Parquet')",
)
node.query_and_get_error(
f"insert into table function test_insert_globs SELECT number, randomString(100) FROM numbers(500)"
)
def test_put_get_with_globs(cluster):
# type: (ClickHouseCluster) -> None
unique_prefix = random.randint(1, 10000)
@ -284,20 +294,26 @@ def test_put_get_with_globs(cluster):
max_path = max(path, max_path)
values = f"({i},{j},{i + j})"
azure_query(node, f"CREATE TABLE test_{i}_{j} ({table_format}) Engine = Azure(azure_conf2, container='cont', blob_path='{path}', format='CSV')")
azure_query(
node,
f"CREATE TABLE test_{i}_{j} ({table_format}) Engine = Azure(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
)
query = f"insert into test_{i}_{j} VALUES {values}"
azure_query(node, query)
azure_query(node, f"CREATE TABLE test_glob_select ({table_format}) Engine = Azure(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV')")
azure_query(
node,
f"CREATE TABLE test_glob_select ({table_format}) Engine = Azure(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV')",
)
query = "select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from test_glob_select"
assert azure_query(node, query).splitlines() == [
"450\t450\t900\t0.csv\t{bucket}/{max_path}".format(
bucket='cont', max_path=max_path
bucket="cont", max_path=max_path
)
]
def test_azure_glob_scheherazade(cluster):
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -310,7 +326,10 @@ def test_azure_glob_scheherazade(cluster):
for i in range(start, end):
path = "night_{}/tale.csv".format(i)
unique_num = random.randint(1, 10000)
azure_query(node, f"CREATE TABLE test_{i}_{unique_num} ({table_format}) Engine = Azure(azure_conf2, container='cont', blob_path='{path}', format='CSV')")
azure_query(
node,
f"CREATE TABLE test_{i}_{unique_num} ({table_format}) Engine = Azure(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
)
query = f"insert into test_{i}_{unique_num} VALUES {values}"
azure_query(node, query)
@ -324,11 +343,14 @@ def test_azure_glob_scheherazade(cluster):
for job in jobs:
job.join()
azure_query(node, f"CREATE TABLE test_glob_select_scheherazade ({table_format}) Engine = Azure(azure_conf2, container='cont', blob_path='night_*/tale.csv', format='CSV')")
azure_query(
node,
f"CREATE TABLE test_glob_select_scheherazade ({table_format}) Engine = Azure(azure_conf2, container='cont', blob_path='night_*/tale.csv', format='CSV')",
)
query = "select count(), sum(column1), sum(column2), sum(column3) from test_glob_select_scheherazade"
assert azure_query(node, query).splitlines() == ["1001\t1001\t1001\t1001"]
@pytest.mark.parametrize(
"extension,method",
[pytest.param("bin", "gzip", id="bin"), pytest.param("gz", "auto", id="gz")],