Merge pull request #22299 from nikitamikhaylov/keen-wolf-storagefile-column-oriented

Merging #21302
This commit is contained in:
Nikita Mikhaylov 2021-04-07 16:01:12 +03:00 committed by GitHub
commit 48af7a898c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 109 additions and 18 deletions

View File

@ -412,11 +412,26 @@ void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & na
{
auto & target = dict[name].supports_parallel_formatting;
if (target)
throw Exception("FormatFactory: Output format " + name + " is already marked as supporting parallel formatting.", ErrorCodes::LOGICAL_ERROR);
throw Exception("FormatFactory: Output format " + name + " is already marked as supporting parallel formatting", ErrorCodes::LOGICAL_ERROR);
target = true;
}
void FormatFactory::markFormatAsColumnOriented(const String & name)
{
auto & target = dict[name].is_column_oriented;
if (target)
throw Exception("FormatFactory: Format " + name + " is already marked as column oriented", ErrorCodes::LOGICAL_ERROR);
target = true;
}
bool FormatFactory::checkIfFormatIsColumnOriented(const String & name)
{
const auto & target = getCreators(name);
return target.is_column_oriented;
}
FormatFactory & FormatFactory::instance()
{
static FormatFactory ret;

View File

@ -101,6 +101,7 @@ private:
OutputProcessorCreator output_processor_creator;
FileSegmentationEngine file_segmentation_engine;
bool supports_parallel_formatting{false};
bool is_column_oriented{false};
};
using FormatsDictionary = std::unordered_map<String, Creators>;
@ -155,6 +156,9 @@ public:
void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator);
void markOutputFormatSupportsParallelFormatting(const String & name);
void markFormatAsColumnOriented(const String & name);
bool checkIfFormatIsColumnOriented(const String & name);
const FormatsDictionary & getAllFormats() const
{

View File

@ -101,7 +101,7 @@ void ArrowBlockInputFormat::prepareReader()
record_batch_current = 0;
}
void registerInputFormatProcessorArrow(FormatFactory &factory)
void registerInputFormatProcessorArrow(FormatFactory & factory)
{
factory.registerInputFormatProcessor(
"Arrow",
@ -112,7 +112,7 @@ void registerInputFormatProcessorArrow(FormatFactory &factory)
{
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false);
});
factory.markFormatAsColumnOriented("Arrow");
factory.registerInputFormatProcessor(
"ArrowStream",
[](ReadBuffer & buf,

View File

@ -64,6 +64,7 @@ void registerInputFormatProcessorORC(FormatFactory &factory)
{
return std::make_shared<ORCBlockInputFormat>(buf, sample);
});
factory.markFormatAsColumnOriented("ORC");
}
}

View File

@ -94,6 +94,7 @@ void registerInputFormatProcessorParquet(FormatFactory &factory)
{
return std::make_shared<ParquetBlockInputFormat>(buf, sample);
});
factory.markFormatAsColumnOriented("Parquet");
}
}

View File

@ -22,6 +22,8 @@
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/parseGlobs.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <fcntl.h>
#include <unistd.h>
@ -149,6 +151,11 @@ Strings StorageFile::getPathsList(const String & table_path, const String & user
return paths;
}
bool StorageFile::isColumnOriented() const
{
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatIsColumnOriented(format_name);
}
StorageFile::StorageFile(int table_fd_, CommonArguments args)
: StorageFile(args)
{
@ -227,6 +234,8 @@ static std::chrono::seconds getLockTimeout(const Context & context)
return std::chrono::seconds{lock_timeout};
}
using StorageFilePtr = std::shared_ptr<StorageFile>;
class StorageFileSource : public SourceWithProgress
{
@ -257,6 +266,18 @@ public:
return header;
}
static Block getBlockForSource(
const StorageFilePtr & storage,
const StorageMetadataPtr & metadata_snapshot,
const ColumnsDescription & columns_description,
const FilesInfoPtr & files_info)
{
if (storage->isColumnOriented())
return metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical(), storage->getVirtuals(), storage->getStorageID());
else
return getHeader(metadata_snapshot, files_info->need_path_column, files_info->need_file_column);
}
StorageFileSource(
std::shared_ptr<StorageFile> storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -264,7 +285,7 @@ public:
UInt64 max_block_size_,
FilesInfoPtr files_info_,
ColumnsDescription columns_description_)
: SourceWithProgress(getHeader(metadata_snapshot_, files_info_->need_path_column, files_info_->need_file_column))
: SourceWithProgress(getBlockForSource(storage_, metadata_snapshot_, columns_description_, files_info_))
, storage(std::move(storage_))
, metadata_snapshot(metadata_snapshot_)
, files_info(std::move(files_info_))
@ -344,8 +365,16 @@ public:
}
read_buf = wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method);
auto get_block_for_format = [&]() -> Block
{
if (storage->isColumnOriented())
return metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
return metadata_snapshot->getSampleBlock();
};
auto format = FormatFactory::instance().getInput(
storage->format_name, *read_buf, metadata_snapshot->getSampleBlock(), context, max_block_size, storage->format_settings);
storage->format_name, *read_buf, get_block_for_format(), context, max_block_size, storage->format_settings);
reader = std::make_shared<InputStreamFromInputFormat>(format);
@ -412,7 +441,6 @@ private:
std::unique_lock<std::shared_timed_mutex> unique_lock;
};
Pipe StorageFile::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -457,9 +485,16 @@ Pipe StorageFile::read(
for (size_t i = 0; i < num_streams; ++i)
{
const auto get_columns_for_format = [&]() -> ColumnsDescription
{
if (isColumnOriented())
return ColumnsDescription{
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()).getNamesAndTypesList()};
else
return metadata_snapshot->getColumns();
};
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info,
metadata_snapshot->getColumns()));
this_ptr, metadata_snapshot, context, max_block_size, files_info, get_columns_for_format()));
}
return Pipe::unitePipes(std::move(pipes));

View File

@ -64,6 +64,12 @@ public:
static Strings getPathsList(const String & table_path, const String & user_files_path, const Context & context);
/// Check if the format is column-oriented.
/// Is is useful because column oriented formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool isColumnOriented() const;
protected:
friend class StorageFileSource;
friend class StorageFileBlockOutputStream;

View File

@ -291,9 +291,10 @@ Block StorageInMemoryMetadata::getSampleBlockForColumns(
{
Block res;
std::unordered_map<String, DataTypePtr> columns_map;
auto all_columns = getColumns().getAllWithSubcolumns();
std::unordered_map<String, DataTypePtr> columns_map;
columns_map.reserve(all_columns.size());
for (const auto & elem : all_columns)
columns_map.emplace(elem.name, elem.type);
@ -306,15 +307,11 @@ Block StorageInMemoryMetadata::getSampleBlockForColumns(
{
auto it = columns_map.find(name);
if (it != columns_map.end())
{
res.insert({it->second->createColumn(), it->second, it->first});
}
else
{
throw Exception(
"Column " + backQuote(name) + " not found in table " + storage_id.getNameForLogs(),
"Column " + backQuote(name) + " not found in table " + (storage_id.empty() ? "" : storage_id.getNameForLogs()),
ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
}
}
return res;

View File

@ -85,9 +85,10 @@ struct StorageInMemoryMetadata
/// Returns combined set of columns
const ColumnsDescription & getColumns() const;
/// Returns secondary indices
/// Returns secondary indices
const IndicesDescription & getSecondaryIndices() const;
/// Has at least one non primary index
bool hasSecondaryIndices() const;
@ -146,8 +147,7 @@ struct StorageInMemoryMetadata
/// Storage metadata. StorageID required only for more clear exception
/// message.
Block getSampleBlockForColumns(
const Names & column_names, const NamesAndTypesList & virtuals, const StorageID & storage_id) const;
const Names & column_names, const NamesAndTypesList & virtuals = {}, const StorageID & storage_id = StorageID::createEmpty()) const;
/// Returns structure with partition key.
const KeyDescription & getPartitionKey() const;
/// Returns ASTExpressionList of partition key expression for storage or nullptr if there is none.

View File

@ -0,0 +1,12 @@
Parquet
6b397d4643bc1f920f3eb8aa87ee180c -
7fe6d8c57ddc5fe37bbdcb7f73c5fa78 -
d8746733270cbeff7ab3550c9b944fb6 -
Arrow
6b397d4643bc1f920f3eb8aa87ee180c -
7fe6d8c57ddc5fe37bbdcb7f73c5fa78 -
d8746733270cbeff7ab3550c9b944fb6 -
ORC
6b397d4643bc1f920f3eb8aa87ee180c -
7fe6d8c57ddc5fe37bbdcb7f73c5fa78 -
d8746733270cbeff7ab3550c9b944fb6 -

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
FORMATS=('Parquet' 'Arrow' 'ORC')
for format in "${FORMATS[@]}"
do
echo $format
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS 00163_column_oriented SYNC"
$CLICKHOUSE_CLIENT -q "CREATE TABLE 00163_column_oriented(ClientEventTime DateTime, MobilePhoneModel String, ClientIP6 FixedString(16)) ENGINE=File($format)"
$CLICKHOUSE_CLIENT -q "INSERT INTO 00163_column_oriented SELECT ClientEventTime, MobilePhoneModel, ClientIP6 FROM test.hits ORDER BY ClientEventTime, MobilePhoneModel, ClientIP6 LIMIT 100"
$CLICKHOUSE_CLIENT -q "SELECT ClientEventTime from 00163_column_oriented" | md5sum
$CLICKHOUSE_CLIENT -q "SELECT MobilePhoneModel from 00163_column_oriented" | md5sum
$CLICKHOUSE_CLIENT -q "SELECT ClientIP6 from 00163_column_oriented" | md5sum
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS 00163_column_oriented SYNC"
done