Merge pull request #34849 from kssenii/fix-too-many-columns

Fix reading too many columns for s3 and url storages
This commit is contained in:
Kseniia Sumarokova 2022-03-03 13:57:22 +01:00 committed by GitHub
commit b11b34dc8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 119 additions and 30 deletions

View File

@ -537,7 +537,7 @@ class IColumn;
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
M(UInt64, merge_tree_min_rows_for_concurrent_read_for_remote_filesystem, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, remote_read_min_bytes_for_seek, DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead for read with ignore.", 0) \
M(UInt64, remote_read_min_bytes_for_seek, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead for read with ignore.", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \

View File

@ -47,7 +47,7 @@ public:
/// Is is useful because column oriented formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool isColumnOriented() const;
bool isColumnOriented() const override;
static ColumnsDescription getTableStructureFromData(
const String & format,

View File

@ -562,6 +562,8 @@ public:
/// Returns true if all disks of storage are read-only.
virtual bool isStaticStorage() const;
virtual bool isColumnOriented() const { return false; }
/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
/// Used for:
/// - Simple count() optimization

View File

@ -64,7 +64,7 @@ public:
/// Is is useful because column oriented formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool isColumnOriented() const;
bool isColumnOriented() const override;
bool supportsPartitionBy() const override { return true; }

View File

@ -615,6 +615,11 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
}
}
bool StorageS3::isColumnOriented() const
{
return FormatFactory::instance().checkIfFormatIsColumnOriented(format_name);
}
Pipe StorageS3::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -639,6 +644,20 @@ Pipe StorageS3::read(
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, local_context);
ColumnsDescription columns_description;
Block block_for_format;
if (isColumnOriented())
{
columns_description = ColumnsDescription{
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()).getNamesAndTypesList()};
block_for_format = metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = metadata_snapshot->getColumns();
block_for_format = metadata_snapshot->getSampleBlock();
}
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageS3Source>(
@ -646,10 +665,10 @@ Pipe StorageS3::read(
need_file_column,
format_name,
getName(),
metadata_snapshot->getSampleBlock(),
block_for_format,
local_context,
format_settings,
metadata_snapshot->getColumns(),
columns_description,
max_block_size,
max_single_read_retries,
compression_method,

View File

@ -218,6 +218,8 @@ private:
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
bool isColumnOriented() const override;
};
}

View File

@ -405,7 +405,7 @@ std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIPara
std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ColumnsDescription & /* columns_description */,
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
@ -482,6 +482,11 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from urls failed. Errors:\n{}", exception_messages);
}
bool IStorageURLBase::isColumnOriented() const
{
return FormatFactory::instance().checkIfFormatIsColumnOriented(format_name);
}
Pipe IStorageURLBase::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -493,6 +498,20 @@ Pipe IStorageURLBase::read(
{
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
ColumnsDescription columns_description;
Block block_for_format;
if (isColumnOriented())
{
columns_description = ColumnsDescription{
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()).getNamesAndTypesList()};
block_for_format = metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = metadata_snapshot->getColumns();
block_for_format = metadata_snapshot->getSampleBlock();
}
if (urlWithGlobs(uri))
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
@ -515,14 +534,14 @@ Pipe IStorageURLBase::read(
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
block_for_format,
local_context,
metadata_snapshot->getColumns(),
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params, /* glob_url */true));
@ -537,14 +556,14 @@ Pipe IStorageURLBase::read(
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
block_for_format,
local_context,
metadata_snapshot->getColumns(),
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
@ -561,6 +580,20 @@ Pipe StorageURLWithFailover::read(
size_t max_block_size,
unsigned /*num_streams*/)
{
ColumnsDescription columns_description;
Block block_for_format;
if (isColumnOriented())
{
columns_description = ColumnsDescription{
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()).getNamesAndTypesList()};
block_for_format = metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = metadata_snapshot->getColumns();
block_for_format = metadata_snapshot->getSampleBlock();
}
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
@ -569,14 +602,14 @@ Pipe StorageURLWithFailover::read(
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
block_for_format,
local_context,
metadata_snapshot->getColumns(),
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));

View File

@ -88,12 +88,14 @@ protected:
virtual std::function<void(std::ostream &)> getReadPOSTDataCallback(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ColumnsDescription & columns_description,
const SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;
bool isColumnOriented() const override;
private:
virtual Block getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const = 0;
};

View File

@ -68,14 +68,14 @@ std::vector<std::pair<std::string, std::string>> StorageXDBC::getReadURIParams(
std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const ColumnsDescription & columns_description,
const SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
String query = transformQueryForExternalDatabase(query_info,
metadata_snapshot->getColumns().getOrdinary(),
columns_description.getOrdinary(),
bridge_helper->getIdentifierQuotingStyle(),
remote_database_name,
remote_table_name,
@ -85,7 +85,7 @@ std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
NamesAndTypesList cols;
for (const String & name : column_names)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(name);
auto column_data = columns_description.getPhysical(name);
cols.emplace_back(column_data.name, column_data.type);
}
@ -114,7 +114,7 @@ Pipe StorageXDBC::read(
return IStorageURLBase::read(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
}
SinkToStoragePtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
bridge_helper->startBridgeSync();
@ -140,6 +140,11 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetad
chooseCompressionMethod(uri, compression_method));
}
bool StorageXDBC::isColumnOriented() const
{
return true;
}
Block StorageXDBC::getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const
{
return metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());

View File

@ -59,13 +59,15 @@ private:
std::function<void(std::ostream &)> getReadPOSTDataCallback(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const ColumnsDescription & columns_description,
const SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
Block getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const override;
bool isColumnOriented() const override;
};
}

View File

@ -818,8 +818,9 @@ def test_seekable_formats(started_cluster):
instance.query("SYSTEM FLUSH LOGS")
result = instance.query(f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM s3') AND memory_usage > 0 ORDER BY event_time desc")
print(result[:3])
assert(int(result[:3]) < 200)
result = result[:result.index('.')]
assert(int(result) < 200)
def test_seekable_formats_url(started_cluster):
@ -842,8 +843,9 @@ def test_seekable_formats_url(started_cluster):
instance.query("SYSTEM FLUSH LOGS")
result = instance.query(f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM url') AND memory_usage > 0 ORDER BY event_time desc")
print(result[:3])
assert(int(result[:3]) < 200)
result = result[:result.index('.')]
assert(int(result) < 200)
def test_empty_file(started_cluster):
@ -886,7 +888,7 @@ def test_s3_schema_inference(started_cluster):
result = instance.query(f"select count(*) from schema_inference")
assert(int(result) == 5000000)
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_native', 'Native')"
result = instance.query(f"desc {table_function}")
assert result == "a\tInt32\t\t\t\t\t\nb\tString\t\t\t\t\t\n"
@ -949,7 +951,7 @@ def test_create_new_files_on_insert(started_cluster):
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
result = instance.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
@ -961,11 +963,11 @@ def test_create_new_files_on_insert(started_cluster):
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
result = instance.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
def test_format_detection(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1039,6 +1041,29 @@ def test_signatures(started_cluster):
assert(int(result) == 1)
def test_select_columns(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
name = "test_table2"
structure = "id UInt32, value1 Int32, value2 Int32"
instance.query(f"drop table if exists {name}")
instance.query(f"CREATE TABLE {name} ({structure}) ENGINE = S3(s3_conf1, format='Parquet')")
limit = 10000000
instance.query(f"INSERT INTO {name} SELECT * FROM generateRandom('{structure}') LIMIT {limit} SETTINGS s3_truncate_on_insert=1")
instance.query(f"SELECT value2 FROM {name}")
instance.query("SYSTEM FLUSH LOGS")
result1 = instance.query(f"SELECT read_bytes FROM system.query_log WHERE type='QueryFinish' and query LIKE 'SELECT value2 FROM {name}'")
instance.query(f"SELECT * FROM {name}")
instance.query("SYSTEM FLUSH LOGS")
result2 = instance.query(f"SELECT read_bytes FROM system.query_log WHERE type='QueryFinish' and query LIKE 'SELECT * FROM {name}'")
assert(int(result1) * 3 <= int(result2))
def test_insert_select_schema_inference(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1049,4 +1074,3 @@ def test_insert_select_schema_inference(started_cluster):
result = instance.query(f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.arrow')")
assert(int(result) == 1)