mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
fixes after review
This commit is contained in:
parent
dc9dc1676d
commit
942f7d7532
@ -1307,7 +1307,9 @@ try
|
|||||||
throw ErrnoException(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Input must be seekable file (it will be read twice)");
|
throw ErrnoException(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Input must be seekable file (it will be read twice)");
|
||||||
|
|
||||||
SingleReadBufferIterator read_buffer_iterator(std::move(file));
|
SingleReadBufferIterator read_buffer_iterator(std::move(file));
|
||||||
schema_columns = readSchemaFromFormat(input_format, {}, read_buffer_iterator, context_const);
|
|
||||||
|
std::string sample_string;
|
||||||
|
schema_columns = readSchemaFromFormat(input_format, {}, read_buffer_iterator, sample_string, context_const);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -935,6 +935,7 @@ class IColumn;
|
|||||||
M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \
|
M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \
|
||||||
M(Bool, allow_deprecated_error_prone_window_functions, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)", 0) \
|
M(Bool, allow_deprecated_error_prone_window_functions, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)", 0) \
|
||||||
M(Bool, allow_deprecated_snowflake_conversion_functions, false, "Enables deprecated functions snowflakeToDateTime[64] and dateTime[64]ToSnowflake.", 0) \
|
M(Bool, allow_deprecated_snowflake_conversion_functions, false, "Enables deprecated functions snowflakeToDateTime[64] and dateTime[64]ToSnowflake.", 0) \
|
||||||
|
M(Bool, use_hive_partitioning, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines.", 0)\
|
||||||
|
|
||||||
// End of COMMON_SETTINGS
|
// End of COMMON_SETTINGS
|
||||||
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS.
|
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS.
|
||||||
@ -1106,11 +1107,6 @@ class IColumn;
|
|||||||
M(Bool, input_format_tsv_skip_trailing_empty_lines, false, "Skip trailing empty lines in TSV format", 0) \
|
M(Bool, input_format_tsv_skip_trailing_empty_lines, false, "Skip trailing empty lines in TSV format", 0) \
|
||||||
M(Bool, input_format_custom_skip_trailing_empty_lines, false, "Skip trailing empty lines in CustomSeparated format", 0) \
|
M(Bool, input_format_custom_skip_trailing_empty_lines, false, "Skip trailing empty lines in CustomSeparated format", 0) \
|
||||||
M(Bool, input_format_tsv_crlf_end_of_line, false, "If it is set true, file function will read TSV format with \\r\\n instead of \\n.", 0) \
|
M(Bool, input_format_tsv_crlf_end_of_line, false, "If it is set true, file function will read TSV format with \\r\\n instead of \\n.", 0) \
|
||||||
M(Bool, file_hive_partitioning, false, "Allows to use hive partitioning for file format", 0)\
|
|
||||||
M(Bool, url_hive_partitioning, false, "Allows to use hive partitioning for url format", 0)\
|
|
||||||
M(Bool, s3_hive_partitioning, false, "Allows to use hive partitioning for s3 format", 0)\
|
|
||||||
M(Bool, azure_blob_storage_hive_partitioning, false, "Allows to use hive partitioning for AzureBlobStorage format", 0)\
|
|
||||||
M(Bool, hdfs_hive_partitioning, false, "Allows to use hive partitioning for hdfs format", 0)\
|
|
||||||
\
|
\
|
||||||
M(Bool, input_format_native_allow_types_conversion, true, "Allow data types conversion in Native input format", 0) \
|
M(Bool, input_format_native_allow_types_conversion, true, "Allow data types conversion in Native input format", 0) \
|
||||||
\
|
\
|
||||||
|
@ -59,11 +59,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
|||||||
{
|
{
|
||||||
{"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."},
|
{"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."},
|
||||||
{"optimize_trivial_insert_select", true, false, "The optimization does not make sense in many cases."},
|
{"optimize_trivial_insert_select", true, false, "The optimization does not make sense in many cases."},
|
||||||
{"file_hive_partitioning", false, false, "A new settings that allows to use hive partitioning for file format."},
|
{"use_hive_partitioning", false, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines."},
|
||||||
{"url_hive_partitioning", false, false, "A new settings that allows to use hive partitioning for url format."},
|
|
||||||
{"s3_hive_partitioning", false, false, "A new settings that allows to use hive partitioning for s3 format."},
|
|
||||||
{"azure_blob_storage_hive_partitioning", false, false, "A new settings that allows to use hive partitioning for AzureBlobStorage format."},
|
|
||||||
{"hdfs_hive_partitioning", false, false, "A new settings that allows to use hive partitioning for hdfs format."},
|
|
||||||
}},
|
}},
|
||||||
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
|
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
|
||||||
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},
|
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},
|
||||||
|
@ -94,6 +94,7 @@ std::pair<ColumnsDescription, String> readSchemaFromFormatImpl(
|
|||||||
std::optional<String> format_name,
|
std::optional<String> format_name,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
IReadBufferIterator & read_buffer_iterator,
|
IReadBufferIterator & read_buffer_iterator,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context)
|
const ContextPtr & context)
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -143,6 +144,10 @@ try
|
|||||||
{
|
{
|
||||||
iterator_data = read_buffer_iterator.next();
|
iterator_data = read_buffer_iterator.next();
|
||||||
|
|
||||||
|
/// Extracting the File path for hive-style partitioning
|
||||||
|
if (sample_path.empty())
|
||||||
|
sample_path = read_buffer_iterator.getLastFilePath();
|
||||||
|
|
||||||
/// Read buffer iterator can determine the data format if it's unknown.
|
/// Read buffer iterator can determine the data format if it's unknown.
|
||||||
/// For example by scanning schema cache or by finding new file with format extension.
|
/// For example by scanning schema cache or by finding new file with format extension.
|
||||||
if (!format_name && iterator_data.format_name)
|
if (!format_name && iterator_data.format_name)
|
||||||
@ -163,7 +168,7 @@ try
|
|||||||
return {*iterator_data.cached_columns, *format_name};
|
return {*iterator_data.cached_columns, *format_name};
|
||||||
}
|
}
|
||||||
|
|
||||||
schemas_for_union_mode.emplace_back(iterator_data.cached_columns->getAll(), read_buffer_iterator.getLastFileName());
|
schemas_for_union_mode.emplace_back(iterator_data.cached_columns->getAll(), read_buffer_iterator.getLastFilePath());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -249,7 +254,7 @@ try
|
|||||||
|
|
||||||
if (!names_and_types.empty())
|
if (!names_and_types.empty())
|
||||||
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
|
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
|
||||||
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFileName());
|
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFilePath());
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
@ -410,7 +415,7 @@ try
|
|||||||
throw Exception(ErrorCodes::CANNOT_DETECT_FORMAT, "The data format cannot be detected by the contents of the files. You can specify the format manually");
|
throw Exception(ErrorCodes::CANNOT_DETECT_FORMAT, "The data format cannot be detected by the contents of the files. You can specify the format manually");
|
||||||
|
|
||||||
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
|
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
|
||||||
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFileName());
|
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFilePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (format_name && mode == SchemaInferenceMode::DEFAULT)
|
if (format_name && mode == SchemaInferenceMode::DEFAULT)
|
||||||
@ -526,9 +531,9 @@ try
|
|||||||
}
|
}
|
||||||
catch (Exception & e)
|
catch (Exception & e)
|
||||||
{
|
{
|
||||||
auto file_name = read_buffer_iterator.getLastFileName();
|
auto file_path = read_buffer_iterator.getLastFilePath();
|
||||||
if (!file_name.empty())
|
if (!file_path.empty())
|
||||||
e.addMessage(fmt::format("(in file/uri {})", file_name));
|
e.addMessage(fmt::format("(in file/uri {})", file_path));
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -536,17 +541,19 @@ ColumnsDescription readSchemaFromFormat(
|
|||||||
const String & format_name,
|
const String & format_name,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
IReadBufferIterator & read_buffer_iterator,
|
IReadBufferIterator & read_buffer_iterator,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context)
|
const ContextPtr & context)
|
||||||
{
|
{
|
||||||
return readSchemaFromFormatImpl(format_name, format_settings, read_buffer_iterator, context).first;
|
return readSchemaFromFormatImpl(format_name, format_settings, read_buffer_iterator, sample_path, context).first;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<ColumnsDescription, String> detectFormatAndReadSchema(
|
std::pair<ColumnsDescription, String> detectFormatAndReadSchema(
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
IReadBufferIterator & read_buffer_iterator,
|
IReadBufferIterator & read_buffer_iterator,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context)
|
const ContextPtr & context)
|
||||||
{
|
{
|
||||||
return readSchemaFromFormatImpl(std::nullopt, format_settings, read_buffer_iterator, context);
|
return readSchemaFromFormatImpl(std::nullopt, format_settings, read_buffer_iterator, sample_path, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
SchemaCache::Key getKeyForSchemaCache(
|
SchemaCache::Key getKeyForSchemaCache(
|
||||||
|
@ -56,8 +56,8 @@ struct IReadBufferIterator
|
|||||||
/// Set auto detected format name.
|
/// Set auto detected format name.
|
||||||
virtual void setFormatName(const String & /*format_name*/) {}
|
virtual void setFormatName(const String & /*format_name*/) {}
|
||||||
|
|
||||||
/// Get last processed file name for better exception messages.
|
/// Get last processed file path for better exception messages.
|
||||||
virtual String getLastFileName() const { return ""; }
|
virtual String getLastFilePath() const { return ""; }
|
||||||
|
|
||||||
/// Return true if method recreateLastReadBuffer is implemented.
|
/// Return true if method recreateLastReadBuffer is implemented.
|
||||||
virtual bool supportsLastReadBufferRecreation() const { return false; }
|
virtual bool supportsLastReadBufferRecreation() const { return false; }
|
||||||
@ -122,6 +122,7 @@ ColumnsDescription readSchemaFromFormat(
|
|||||||
const String & format_name,
|
const String & format_name,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
IReadBufferIterator & read_buffer_iterator,
|
IReadBufferIterator & read_buffer_iterator,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context);
|
const ContextPtr & context);
|
||||||
|
|
||||||
/// Try to detect the format of the data and it's schema.
|
/// Try to detect the format of the data and it's schema.
|
||||||
@ -131,6 +132,7 @@ ColumnsDescription readSchemaFromFormat(
|
|||||||
std::pair<ColumnsDescription, String> detectFormatAndReadSchema(
|
std::pair<ColumnsDescription, String> detectFormatAndReadSchema(
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
IReadBufferIterator & read_buffer_iterator,
|
IReadBufferIterator & read_buffer_iterator,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context);
|
const ContextPtr & context);
|
||||||
|
|
||||||
SchemaCache::Key getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
|
SchemaCache::Key getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
|
||||||
|
@ -89,8 +89,9 @@ public:
|
|||||||
{
|
{
|
||||||
ConfigurationPtr configuration = base_configuration->clone();
|
ConfigurationPtr configuration = base_configuration->clone();
|
||||||
configuration->setPaths(metadata->getDataFiles());
|
configuration->setPaths(metadata->getDataFiles());
|
||||||
|
std::string sample_string;
|
||||||
return Storage::resolveSchemaFromData(
|
return Storage::resolveSchemaFromData(
|
||||||
object_storage_, configuration, format_settings_, local_context);
|
object_storage_, configuration, format_settings_, sample_string, local_context);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,7 +131,7 @@ void ReadBufferIterator::setFormatName(const String & format_name)
|
|||||||
format = format_name;
|
format = format_name;
|
||||||
}
|
}
|
||||||
|
|
||||||
String ReadBufferIterator::getLastFileName() const
|
String ReadBufferIterator::getLastFilePath() const
|
||||||
{
|
{
|
||||||
if (current_object_info)
|
if (current_object_info)
|
||||||
return current_object_info->getPath();
|
return current_object_info->getPath();
|
||||||
|
@ -33,7 +33,7 @@ public:
|
|||||||
|
|
||||||
void setResultingSchema(const ColumnsDescription & columns) override;
|
void setResultingSchema(const ColumnsDescription & columns) override;
|
||||||
|
|
||||||
String getLastFileName() const override;
|
String getLastFilePath() const override;
|
||||||
|
|
||||||
void setFormatName(const String & format_name) override;
|
void setFormatName(const String & format_name) override;
|
||||||
|
|
||||||
|
@ -33,17 +33,22 @@ namespace ErrorCodes
|
|||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string StorageObjectStorage::getPathSample(StorageInMemoryMetadata metadata, ContextPtr context)
|
||||||
bool checkIfHiveSettingEnabled(const ContextPtr & context, const std::string & storage_type_name)
|
|
||||||
{
|
{
|
||||||
if (storage_type_name == "s3")
|
auto file_iterator = StorageObjectStorageSource::createFileIterator(
|
||||||
return context->getSettings().s3_hive_partitioning;
|
configuration,
|
||||||
else if (storage_type_name == "hdfs")
|
object_storage,
|
||||||
return context->getSettings().hdfs_hive_partitioning;
|
distributed_processing,
|
||||||
else if (storage_type_name == "azure")
|
context,
|
||||||
return context->getSettings().azure_blob_storage_hive_partitioning;
|
{}, // predicate
|
||||||
else
|
metadata.getColumns().getAll(), // virtual_columns
|
||||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported storage type: {}", storage_type_name);
|
nullptr, // read_keys
|
||||||
|
{} // file_progress_callback
|
||||||
|
);
|
||||||
|
|
||||||
|
if (auto file = file_iterator->next(0))
|
||||||
|
return file->getPath();
|
||||||
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageObjectStorage::StorageObjectStorage(
|
StorageObjectStorage::StorageObjectStorage(
|
||||||
@ -66,7 +71,9 @@ StorageObjectStorage::StorageObjectStorage(
|
|||||||
, log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName())))
|
, log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName())))
|
||||||
{
|
{
|
||||||
ColumnsDescription columns{columns_};
|
ColumnsDescription columns{columns_};
|
||||||
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, context);
|
|
||||||
|
std::string sample_path;
|
||||||
|
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context);
|
||||||
configuration->check(context);
|
configuration->check(context);
|
||||||
|
|
||||||
StorageInMemoryMetadata metadata;
|
StorageInMemoryMetadata metadata;
|
||||||
@ -74,23 +81,13 @@ StorageObjectStorage::StorageObjectStorage(
|
|||||||
metadata.setConstraints(constraints_);
|
metadata.setConstraints(constraints_);
|
||||||
metadata.setComment(comment);
|
metadata.setComment(comment);
|
||||||
|
|
||||||
auto file_iterator = StorageObjectStorageSource::createFileIterator(
|
|
||||||
configuration,
|
if (sample_path.empty() && context->getSettings().use_hive_partitioning)
|
||||||
object_storage,
|
sample_path = getPathSample(metadata, context);
|
||||||
distributed_processing_,
|
else if (!context->getSettings().use_hive_partitioning)
|
||||||
context,
|
sample_path = "";
|
||||||
{}, // predicate
|
|
||||||
metadata.getColumns().getAll(), // virtual_columns
|
|
||||||
nullptr, // read_keys
|
|
||||||
{} // file_progress_callback
|
|
||||||
);
|
|
||||||
|
|
||||||
Strings paths;
|
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns(), sample_path));
|
||||||
|
|
||||||
if (checkIfHiveSettingEnabled(context, configuration->getTypeName()))
|
|
||||||
if (auto file = file_iterator->next(0))
|
|
||||||
paths = {file->getPath()};
|
|
||||||
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns(), paths));
|
|
||||||
setInMemoryMetadata(metadata);
|
setInMemoryMetadata(metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -386,33 +383,36 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(
|
|||||||
const ObjectStoragePtr & object_storage,
|
const ObjectStoragePtr & object_storage,
|
||||||
const ConfigurationPtr & configuration,
|
const ConfigurationPtr & configuration,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context)
|
const ContextPtr & context)
|
||||||
{
|
{
|
||||||
ObjectInfos read_keys;
|
ObjectInfos read_keys;
|
||||||
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
|
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
|
||||||
return readSchemaFromFormat(configuration->format, format_settings, *iterator, context);
|
return readSchemaFromFormat(configuration->format, format_settings, *iterator, sample_path, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string StorageObjectStorage::resolveFormatFromData(
|
std::string StorageObjectStorage::resolveFormatFromData(
|
||||||
const ObjectStoragePtr & object_storage,
|
const ObjectStoragePtr & object_storage,
|
||||||
const ConfigurationPtr & configuration,
|
const ConfigurationPtr & configuration,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context)
|
const ContextPtr & context)
|
||||||
{
|
{
|
||||||
ObjectInfos read_keys;
|
ObjectInfos read_keys;
|
||||||
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
|
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
|
||||||
return detectFormatAndReadSchema(format_settings, *iterator, context).second;
|
return detectFormatAndReadSchema(format_settings, *iterator, sample_path, context).second;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<ColumnsDescription, std::string> StorageObjectStorage::resolveSchemaAndFormatFromData(
|
std::pair<ColumnsDescription, std::string> StorageObjectStorage::resolveSchemaAndFormatFromData(
|
||||||
const ObjectStoragePtr & object_storage,
|
const ObjectStoragePtr & object_storage,
|
||||||
const ConfigurationPtr & configuration,
|
const ConfigurationPtr & configuration,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context)
|
const ContextPtr & context)
|
||||||
{
|
{
|
||||||
ObjectInfos read_keys;
|
ObjectInfos read_keys;
|
||||||
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
|
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
|
||||||
auto [columns, format] = detectFormatAndReadSchema(format_settings, *iterator, context);
|
auto [columns, format] = detectFormatAndReadSchema(format_settings, *iterator, sample_path, context);
|
||||||
configuration->format = format;
|
configuration->format = format;
|
||||||
return std::pair(columns, format);
|
return std::pair(columns, format);
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,7 @@ public:
|
|||||||
size_t list_object_keys_size;
|
size_t list_object_keys_size;
|
||||||
bool throw_on_zero_files_match;
|
bool throw_on_zero_files_match;
|
||||||
bool ignore_non_existent_file;
|
bool ignore_non_existent_file;
|
||||||
|
bool use_hive_partitioning;
|
||||||
};
|
};
|
||||||
|
|
||||||
StorageObjectStorage(
|
StorageObjectStorage(
|
||||||
@ -100,23 +101,28 @@ public:
|
|||||||
const ObjectStoragePtr & object_storage,
|
const ObjectStoragePtr & object_storage,
|
||||||
const ConfigurationPtr & configuration,
|
const ConfigurationPtr & configuration,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context);
|
const ContextPtr & context);
|
||||||
|
|
||||||
static std::string resolveFormatFromData(
|
static std::string resolveFormatFromData(
|
||||||
const ObjectStoragePtr & object_storage,
|
const ObjectStoragePtr & object_storage,
|
||||||
const ConfigurationPtr & configuration,
|
const ConfigurationPtr & configuration,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context);
|
const ContextPtr & context);
|
||||||
|
|
||||||
static std::pair<ColumnsDescription, std::string> resolveSchemaAndFormatFromData(
|
static std::pair<ColumnsDescription, std::string> resolveSchemaAndFormatFromData(
|
||||||
const ObjectStoragePtr & object_storage,
|
const ObjectStoragePtr & object_storage,
|
||||||
const ConfigurationPtr & configuration,
|
const ConfigurationPtr & configuration,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context);
|
const ContextPtr & context);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual void updateConfiguration(ContextPtr local_context);
|
virtual void updateConfiguration(ContextPtr local_context);
|
||||||
|
|
||||||
|
std::string getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
|
||||||
|
|
||||||
static std::unique_ptr<ReadBufferIterator> createReadBufferIterator(
|
static std::unique_ptr<ReadBufferIterator> createReadBufferIterator(
|
||||||
const ObjectStoragePtr & object_storage,
|
const ObjectStoragePtr & object_storage,
|
||||||
const ConfigurationPtr & configuration,
|
const ConfigurationPtr & configuration,
|
||||||
|
@ -33,7 +33,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
|
|||||||
, object_storage(object_storage_)
|
, object_storage(object_storage_)
|
||||||
{
|
{
|
||||||
ColumnsDescription columns{columns_};
|
ColumnsDescription columns{columns_};
|
||||||
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, {}, context_);
|
std::string sample_path;
|
||||||
|
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, {}, sample_path, context_);
|
||||||
configuration->check(context_);
|
configuration->check(context_);
|
||||||
|
|
||||||
StorageInMemoryMetadata metadata;
|
StorageInMemoryMetadata metadata;
|
||||||
|
@ -197,16 +197,6 @@ Chunk StorageObjectStorageSource::generate()
|
|||||||
const auto & filename = object_info->getFileName();
|
const auto & filename = object_info->getFileName();
|
||||||
chassert(object_info->metadata);
|
chassert(object_info->metadata);
|
||||||
|
|
||||||
auto hive_map = VirtualColumnUtils::parsePartitionMapFromPath(object_info->getPath());
|
|
||||||
bool contains_virtual_column = std::any_of(hive_map.begin(), hive_map.end(),
|
|
||||||
[&](const auto& pair)
|
|
||||||
{
|
|
||||||
return read_from_format_info.requested_virtual_columns.contains(pair.first);
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!contains_virtual_column)
|
|
||||||
hive_map.clear(); // If we cannot find any virtual column in requested, we don't add any of them to chunk
|
|
||||||
|
|
||||||
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
|
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
|
||||||
chunk, read_from_format_info.requested_virtual_columns,
|
chunk, read_from_format_info.requested_virtual_columns,
|
||||||
{
|
{
|
||||||
@ -214,8 +204,7 @@ Chunk StorageObjectStorageSource::generate()
|
|||||||
.size = object_info->metadata->size_bytes,
|
.size = object_info->metadata->size_bytes,
|
||||||
.filename = &filename,
|
.filename = &filename,
|
||||||
.last_modified = object_info->metadata->last_modified,
|
.last_modified = object_info->metadata->last_modified,
|
||||||
.hive_partitioning_map = hive_map
|
}, object_info->getPath());
|
||||||
});
|
|
||||||
return chunk;
|
return chunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,19 +49,20 @@ void resolveSchemaAndFormat(
|
|||||||
ObjectStoragePtr object_storage,
|
ObjectStoragePtr object_storage,
|
||||||
const StorageObjectStorage::ConfigurationPtr & configuration,
|
const StorageObjectStorage::ConfigurationPtr & configuration,
|
||||||
std::optional<FormatSettings> format_settings,
|
std::optional<FormatSettings> format_settings,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context)
|
const ContextPtr & context)
|
||||||
{
|
{
|
||||||
if (columns.empty())
|
if (columns.empty())
|
||||||
{
|
{
|
||||||
if (format == "auto")
|
if (format == "auto")
|
||||||
std::tie(columns, format) =
|
std::tie(columns, format) =
|
||||||
StorageObjectStorage::resolveSchemaAndFormatFromData(object_storage, configuration, format_settings, context);
|
StorageObjectStorage::resolveSchemaAndFormatFromData(object_storage, configuration, format_settings, sample_path, context);
|
||||||
else
|
else
|
||||||
columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, context);
|
columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, sample_path, context);
|
||||||
}
|
}
|
||||||
else if (format == "auto")
|
else if (format == "auto")
|
||||||
{
|
{
|
||||||
format = StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, context);
|
format = StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, sample_path, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!columns.hasOnlyOrdinary())
|
if (!columns.hasOnlyOrdinary())
|
||||||
|
@ -19,6 +19,7 @@ void resolveSchemaAndFormat(
|
|||||||
ObjectStoragePtr object_storage,
|
ObjectStoragePtr object_storage,
|
||||||
const StorageObjectStorage::ConfigurationPtr & configuration,
|
const StorageObjectStorage::ConfigurationPtr & configuration,
|
||||||
std::optional<FormatSettings> format_settings,
|
std::optional<FormatSettings> format_settings,
|
||||||
|
std::string & sample_path,
|
||||||
const ContextPtr & context);
|
const ContextPtr & context);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -160,7 +160,8 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
|
|||||||
configuration->check(context_);
|
configuration->check(context_);
|
||||||
|
|
||||||
ColumnsDescription columns{columns_};
|
ColumnsDescription columns{columns_};
|
||||||
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, context_);
|
std::string sample_path;
|
||||||
|
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context_);
|
||||||
configuration->check(context_);
|
configuration->check(context_);
|
||||||
|
|
||||||
StorageInMemoryMetadata storage_metadata;
|
StorageInMemoryMetadata storage_metadata;
|
||||||
|
@ -502,7 +502,7 @@ namespace
|
|||||||
StorageFile::getSchemaCache(getContext()).addManyColumns(cache_keys, columns);
|
StorageFile::getSchemaCache(getContext()).addManyColumns(cache_keys, columns);
|
||||||
}
|
}
|
||||||
|
|
||||||
String getLastFileName() const override
|
String getLastFilePath() const override
|
||||||
{
|
{
|
||||||
if (current_index != 0)
|
if (current_index != 0)
|
||||||
return paths[current_index - 1];
|
return paths[current_index - 1];
|
||||||
@ -777,7 +777,7 @@ namespace
|
|||||||
format = format_name;
|
format = format_name;
|
||||||
}
|
}
|
||||||
|
|
||||||
String getLastFileName() const override
|
String getLastFilePath() const override
|
||||||
{
|
{
|
||||||
return last_read_file_path;
|
return last_read_file_path;
|
||||||
}
|
}
|
||||||
@ -880,10 +880,11 @@ std::pair<ColumnsDescription, String> StorageFile::getTableStructureAndFormatFro
|
|||||||
auto read_buffer_iterator = SingleReadBufferIterator(std::move(read_buf));
|
auto read_buffer_iterator = SingleReadBufferIterator(std::move(read_buf));
|
||||||
|
|
||||||
ColumnsDescription columns;
|
ColumnsDescription columns;
|
||||||
|
std::string sample_path;
|
||||||
if (format)
|
if (format)
|
||||||
columns = readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context);
|
columns = readSchemaFromFormat(*format, format_settings, read_buffer_iterator, sample_path, context);
|
||||||
else
|
else
|
||||||
std::tie(columns, format) = detectFormatAndReadSchema(format_settings, read_buffer_iterator, context);
|
std::tie(columns, format) = detectFormatAndReadSchema(format_settings, read_buffer_iterator, sample_path, context);
|
||||||
|
|
||||||
peekable_read_buffer_from_fd = read_buffer_iterator.releaseBuffer();
|
peekable_read_buffer_from_fd = read_buffer_iterator.releaseBuffer();
|
||||||
if (peekable_read_buffer_from_fd)
|
if (peekable_read_buffer_from_fd)
|
||||||
@ -928,20 +929,21 @@ std::pair<ColumnsDescription, String> StorageFile::getTableStructureAndFormatFro
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string sample_path;
|
||||||
if (archive_info)
|
if (archive_info)
|
||||||
{
|
{
|
||||||
ReadBufferFromArchiveIterator read_buffer_iterator(*archive_info, format, format_settings, context);
|
ReadBufferFromArchiveIterator read_buffer_iterator(*archive_info, format, format_settings, context);
|
||||||
if (format)
|
if (format)
|
||||||
return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format};
|
return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, sample_path, context), *format};
|
||||||
|
|
||||||
return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context);
|
return detectFormatAndReadSchema(format_settings, read_buffer_iterator, sample_path, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
ReadBufferFromFileIterator read_buffer_iterator(paths, format, compression_method, format_settings, context);
|
ReadBufferFromFileIterator read_buffer_iterator(paths, format, compression_method, format_settings, context);
|
||||||
if (format)
|
if (format)
|
||||||
return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format};
|
return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, sample_path, context), *format};
|
||||||
|
|
||||||
return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context);
|
return detectFormatAndReadSchema(format_settings, read_buffer_iterator, sample_path, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnsDescription StorageFile::getTableStructureFromFile(
|
ColumnsDescription StorageFile::getTableStructureFromFile(
|
||||||
@ -1097,10 +1099,10 @@ void StorageFile::setStorageMetadata(CommonArguments args)
|
|||||||
storage_metadata.setComment(args.comment);
|
storage_metadata.setComment(args.comment);
|
||||||
setInMemoryMetadata(storage_metadata);
|
setInMemoryMetadata(storage_metadata);
|
||||||
|
|
||||||
Strings paths_for_virtuals;
|
std::string path_for_virtuals;
|
||||||
if (args.getContext()->getSettingsRef().file_hive_partitioning)
|
if (args.getContext()->getSettingsRef().use_hive_partitioning && !paths.empty())
|
||||||
paths_for_virtuals = paths;
|
path_for_virtuals = paths[0];
|
||||||
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), paths_for_virtuals));
|
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), path_for_virtuals, format_settings.value_or(FormatSettings{})));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1442,14 +1444,9 @@ Chunk StorageFileSource::generate()
|
|||||||
chunk_size = input_format->getApproxBytesReadForChunk();
|
chunk_size = input_format->getApproxBytesReadForChunk();
|
||||||
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
|
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
|
||||||
|
|
||||||
std::map<std::string, std::string> hive_map;
|
std::string hive_partitioning_path;
|
||||||
if (getContext()->getSettingsRef().file_hive_partitioning)
|
if (getContext()->getSettingsRef().use_hive_partitioning)
|
||||||
{
|
hive_partitioning_path = current_path;
|
||||||
hive_map = VirtualColumnUtils::parsePartitionMapFromPath(current_path);
|
|
||||||
|
|
||||||
for (const auto& item : hive_map)
|
|
||||||
requested_virtual_columns.push_back(NameAndTypePair(item.first, std::make_shared<DataTypeString>()));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Enrich with virtual columns.
|
/// Enrich with virtual columns.
|
||||||
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
|
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
|
||||||
@ -1459,8 +1456,7 @@ Chunk StorageFileSource::generate()
|
|||||||
.size = current_file_size,
|
.size = current_file_size,
|
||||||
.filename = (filename_override.has_value() ? &filename_override.value() : nullptr),
|
.filename = (filename_override.has_value() ? &filename_override.value() : nullptr),
|
||||||
.last_modified = current_file_last_modified,
|
.last_modified = current_file_last_modified,
|
||||||
.hive_partitioning_map = hive_map
|
}, hive_partitioning_path);
|
||||||
});
|
|
||||||
|
|
||||||
return chunk;
|
return chunk;
|
||||||
}
|
}
|
||||||
@ -1636,16 +1632,6 @@ void ReadFromFile::createIterator(const ActionsDAG::Node * predicate)
|
|||||||
storage->distributed_processing);
|
storage->distributed_processing);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addPartitionColumnsToInfoHeader(Strings paths, ReadFromFormatInfo & info)
|
|
||||||
{
|
|
||||||
for (const auto& path : paths)
|
|
||||||
{
|
|
||||||
auto map = VirtualColumnUtils::parsePartitionMapFromPath(path);
|
|
||||||
for (const auto& item : map)
|
|
||||||
info.source_header.insertUnique(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), item.first));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||||
{
|
{
|
||||||
createIterator(nullptr);
|
createIterator(nullptr);
|
||||||
@ -1665,9 +1651,6 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui
|
|||||||
paths = storage->paths;
|
paths = storage->paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (getContext()->getSettingsRef().file_hive_partitioning)
|
|
||||||
addPartitionColumnsToInfoHeader(paths, info);
|
|
||||||
|
|
||||||
if (max_num_streams > files_to_read)
|
if (max_num_streams > files_to_read)
|
||||||
num_streams = files_to_read;
|
num_streams = files_to_read;
|
||||||
|
|
||||||
|
@ -153,10 +153,10 @@ IStorageURLBase::IStorageURLBase(
|
|||||||
storage_metadata.setComment(comment);
|
storage_metadata.setComment(comment);
|
||||||
setInMemoryMetadata(storage_metadata);
|
setInMemoryMetadata(storage_metadata);
|
||||||
|
|
||||||
Strings uri_for_partitioning;
|
std::string uri_for_partitioning;
|
||||||
if (context_->getSettingsRef().url_hive_partitioning)
|
if (context_->getSettingsRef().use_hive_partitioning)
|
||||||
uri_for_partitioning = {uri};
|
uri_for_partitioning = uri;
|
||||||
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), uri_for_partitioning));
|
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), uri_for_partitioning, format_settings.value_or(FormatSettings{})));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -415,9 +415,9 @@ Chunk StorageURLSource::generate()
|
|||||||
size_t chunk_size = 0;
|
size_t chunk_size = 0;
|
||||||
if (input_format)
|
if (input_format)
|
||||||
chunk_size = input_format->getApproxBytesReadForChunk();
|
chunk_size = input_format->getApproxBytesReadForChunk();
|
||||||
std::map<std::string, std::string> hive_map;
|
std::string hive_partitioning_path;
|
||||||
if (getContext()->getSettingsRef().url_hive_partitioning)
|
if (getContext()->getSettingsRef().use_hive_partitioning)
|
||||||
hive_map = VirtualColumnUtils::parsePartitionMapFromPath(curr_uri.getPath());
|
hive_partitioning_path = curr_uri.getPath();
|
||||||
|
|
||||||
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
|
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
|
||||||
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
|
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
|
||||||
@ -425,8 +425,7 @@ Chunk StorageURLSource::generate()
|
|||||||
{
|
{
|
||||||
.path = curr_uri.getPath(),
|
.path = curr_uri.getPath(),
|
||||||
.size = current_file_size,
|
.size = current_file_size,
|
||||||
.hive_partitioning_map = hive_map
|
}, hive_partitioning_path);
|
||||||
});
|
|
||||||
return chunk;
|
return chunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -859,7 +858,7 @@ namespace
|
|||||||
format = format_name;
|
format = format_name;
|
||||||
}
|
}
|
||||||
|
|
||||||
String getLastFileName() const override { return current_url_option; }
|
String getLastFilePath() const override { return current_url_option; }
|
||||||
|
|
||||||
bool supportsLastReadBufferRecreation() const override { return true; }
|
bool supportsLastReadBufferRecreation() const override { return true; }
|
||||||
|
|
||||||
@ -960,9 +959,10 @@ std::pair<ColumnsDescription, String> IStorageURLBase::getTableStructureAndForma
|
|||||||
urls_to_check = {uri};
|
urls_to_check = {uri};
|
||||||
|
|
||||||
ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context);
|
ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context);
|
||||||
|
std::string sample_path;
|
||||||
if (format)
|
if (format)
|
||||||
return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format};
|
return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, sample_path, context), *format};
|
||||||
return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context);
|
return detectFormatAndReadSchema(format_settings, read_buffer_iterator, sample_path, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnsDescription IStorageURLBase::getTableStructureFromData(
|
ColumnsDescription IStorageURLBase::getTableStructureFromData(
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
#include <Common/re2.h>
|
#include <Common/re2.h>
|
||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
|
#include <Formats/SchemaInferenceUtils.h>
|
||||||
#include "Functions/FunctionsLogical.h"
|
#include "Functions/FunctionsLogical.h"
|
||||||
#include "Functions/IFunction.h"
|
#include "Functions/IFunction.h"
|
||||||
#include "Functions/IFunctionAdaptors.h"
|
#include "Functions/IFunctionAdaptors.h"
|
||||||
@ -115,22 +116,19 @@ NameSet getVirtualNamesForFileLikeStorage()
|
|||||||
return {"_path", "_file", "_size", "_time"};
|
return {"_path", "_file", "_size", "_time"};
|
||||||
}
|
}
|
||||||
|
|
||||||
Strings parseVirtualColumnNameFromPath(const std::string & path)
|
std::map<std::string, std::string> parseFromPath(const std::string& path)
|
||||||
{
|
{
|
||||||
std::string pattern = "/([^/]+)=([^/]+)";
|
std::string pattern = "/([^/]+)=([^/]+)";
|
||||||
// Map to store the key-value pairs
|
|
||||||
std::map<std::string, std::string> key_values;
|
|
||||||
|
|
||||||
re2::StringPiece input_piece(path);
|
re2::StringPiece input_piece(path);
|
||||||
std::string key;
|
|
||||||
Strings result;
|
|
||||||
while (RE2::FindAndConsume(&input_piece, pattern, &key))
|
|
||||||
result.push_back(key);
|
|
||||||
|
|
||||||
return result;
|
std::map<std::string, std::string> key_values;
|
||||||
|
std::string key, value;
|
||||||
|
while (RE2::FindAndConsume(&input_piece, pattern, &key, &value))
|
||||||
|
key_values["_" + key] = value;
|
||||||
|
return key_values;
|
||||||
}
|
}
|
||||||
|
|
||||||
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, Strings paths)
|
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, std::string path, FormatSettings settings)
|
||||||
{
|
{
|
||||||
VirtualColumnsDescription desc;
|
VirtualColumnsDescription desc;
|
||||||
|
|
||||||
@ -147,11 +145,13 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription
|
|||||||
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
|
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
|
||||||
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
|
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
|
||||||
|
|
||||||
for (const auto& path : paths)
|
auto map = parseFromPath(path);
|
||||||
|
for (const auto& item : map)
|
||||||
{
|
{
|
||||||
auto names = parseVirtualColumnNameFromPath(path);
|
auto type = tryInferDataTypeForSingleField(item.second, settings);
|
||||||
for (const auto& name : names)
|
if (type == nullptr)
|
||||||
add_virtual("_" + name, std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
|
type = std::make_shared<DataTypeString>();
|
||||||
|
add_virtual(item.first, std::make_shared<DataTypeLowCardinality>(type));
|
||||||
}
|
}
|
||||||
|
|
||||||
return desc;
|
return desc;
|
||||||
@ -213,25 +213,11 @@ ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const
|
|||||||
return block.getByName("_idx").column;
|
return block.getByName("_idx").column;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::map<std::string, std::string> parsePartitionMapFromPath(const std::string & path)
|
|
||||||
{
|
|
||||||
std::string pattern = "/([^/]+)=([^/]+)"; // Regex to capture key=value pairs
|
|
||||||
// Map to store the key-value pairs
|
|
||||||
std::map<std::string, std::string> key_values;
|
|
||||||
|
|
||||||
re2::StringPiece input_piece(path);
|
|
||||||
std::string key;
|
|
||||||
std::string value;
|
|
||||||
while (RE2::FindAndConsume(&input_piece, pattern, &key, &value))
|
|
||||||
key_values["_" + key] = value;
|
|
||||||
|
|
||||||
return key_values;
|
|
||||||
}
|
|
||||||
|
|
||||||
void addRequestedFileLikeStorageVirtualsToChunk(
|
void addRequestedFileLikeStorageVirtualsToChunk(
|
||||||
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns,
|
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns,
|
||||||
VirtualsForFileLikeStorage virtual_values)
|
VirtualsForFileLikeStorage virtual_values, const std::string & hive_partitioning_path)
|
||||||
{
|
{
|
||||||
|
auto hive_map = parseFromPath(hive_partitioning_path);
|
||||||
for (const auto & virtual_column : requested_virtual_columns)
|
for (const auto & virtual_column : requested_virtual_columns)
|
||||||
{
|
{
|
||||||
if (virtual_column.name == "_path")
|
if (virtual_column.name == "_path")
|
||||||
@ -265,13 +251,22 @@ void addRequestedFileLikeStorageVirtualsToChunk(
|
|||||||
else
|
else
|
||||||
chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst());
|
chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst());
|
||||||
}
|
}
|
||||||
else
|
else if (!hive_map.empty())
|
||||||
{
|
{
|
||||||
auto it = virtual_values.hive_partitioning_map.find(virtual_column.getNameInStorage());
|
bool contains_virtual_column = std::any_of(hive_map.begin(), hive_map.end(),
|
||||||
if (it != virtual_values.hive_partitioning_map.end())
|
[&](const auto& pair)
|
||||||
|
{
|
||||||
|
return requested_virtual_columns.contains(pair.first);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!contains_virtual_column)
|
||||||
|
hive_map.clear(); // If we cannot find any virtual column in requested, we don't add any of them to chunk
|
||||||
|
|
||||||
|
auto it = hive_map.find(virtual_column.getNameInStorage());
|
||||||
|
if (it != hive_map.end())
|
||||||
{
|
{
|
||||||
chunk.addColumn(virtual_column.getTypeInStorage()->createColumnConst(chunk.getNumRows(), it->second)->convertToFullColumnIfConst());
|
chunk.addColumn(virtual_column.getTypeInStorage()->createColumnConst(chunk.getNumRows(), it->second)->convertToFullColumnIfConst());
|
||||||
virtual_values.hive_partitioning_map.erase(it);
|
hive_map.erase(it);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <Parsers/IAST_fwd.h>
|
#include <Parsers/IAST_fwd.h>
|
||||||
#include <Storages/SelectQueryInfo.h>
|
#include <Storages/SelectQueryInfo.h>
|
||||||
#include <Storages/VirtualColumnsDescription.h>
|
#include <Storages/VirtualColumnsDescription.h>
|
||||||
|
#include <Formats/FormatSettings.h>
|
||||||
|
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <string>
|
#include <string>
|
||||||
@ -49,7 +50,7 @@ auto extractSingleValueFromBlock(const Block & block, const String & name)
|
|||||||
}
|
}
|
||||||
|
|
||||||
NameSet getVirtualNamesForFileLikeStorage();
|
NameSet getVirtualNamesForFileLikeStorage();
|
||||||
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, Strings paths = {});
|
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, std::string path = "", FormatSettings settings = FormatSettings());
|
||||||
|
|
||||||
ActionsDAGPtr createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
|
ActionsDAGPtr createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
|
||||||
|
|
||||||
@ -76,15 +77,13 @@ struct VirtualsForFileLikeStorage
|
|||||||
std::optional<size_t> size { std::nullopt };
|
std::optional<size_t> size { std::nullopt };
|
||||||
const String * filename { nullptr };
|
const String * filename { nullptr };
|
||||||
std::optional<Poco::Timestamp> last_modified { std::nullopt };
|
std::optional<Poco::Timestamp> last_modified { std::nullopt };
|
||||||
std::map<std::string, std::string> hive_partitioning_map {};
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
std::map<std::string, std::string> parsePartitionMapFromPath(const std::string & path);
|
std::map<std::string, std::string> parseFromPath(const std::string& path);
|
||||||
|
|
||||||
void addRequestedFileLikeStorageVirtualsToChunk(
|
void addRequestedFileLikeStorageVirtualsToChunk(
|
||||||
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns,
|
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns,
|
||||||
VirtualsForFileLikeStorage virtual_values);
|
VirtualsForFileLikeStorage virtual_values, const std::string & hive_partitioning_path = "");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -85,9 +85,10 @@ ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr conte
|
|||||||
if (structure == "auto")
|
if (structure == "auto")
|
||||||
{
|
{
|
||||||
SingleReadBufferIterator read_buffer_iterator(std::make_unique<ReadBufferFromString>(data));
|
SingleReadBufferIterator read_buffer_iterator(std::make_unique<ReadBufferFromString>(data));
|
||||||
|
std::string sample_path;
|
||||||
if (format == "auto")
|
if (format == "auto")
|
||||||
return detectFormatAndReadSchema(std::nullopt, read_buffer_iterator, context).first;
|
return detectFormatAndReadSchema(std::nullopt, read_buffer_iterator, sample_path, context).first;
|
||||||
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, context);
|
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, sample_path, context);
|
||||||
}
|
}
|
||||||
return parseColumnsListFromString(structure, context);
|
return parseColumnsListFromString(structure, context);
|
||||||
}
|
}
|
||||||
@ -131,11 +132,12 @@ StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, Con
|
|||||||
String format_name = format;
|
String format_name = format;
|
||||||
if (structure == "auto")
|
if (structure == "auto")
|
||||||
{
|
{
|
||||||
|
std::string sample_path;
|
||||||
SingleReadBufferIterator read_buffer_iterator(std::make_unique<ReadBufferFromString>(data));
|
SingleReadBufferIterator read_buffer_iterator(std::make_unique<ReadBufferFromString>(data));
|
||||||
if (format_name == "auto")
|
if (format_name == "auto")
|
||||||
std::tie(columns, format_name) = detectFormatAndReadSchema(std::nullopt, read_buffer_iterator, context);
|
std::tie(columns, format_name) = detectFormatAndReadSchema(std::nullopt, read_buffer_iterator, sample_path, context);
|
||||||
else
|
else
|
||||||
columns = readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, context);
|
columns = readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, sample_path, context);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -84,7 +84,8 @@ ColumnsDescription TableFunctionObjectStorage<
|
|||||||
context->checkAccess(getSourceAccessType());
|
context->checkAccess(getSourceAccessType());
|
||||||
ColumnsDescription columns;
|
ColumnsDescription columns;
|
||||||
auto storage = getObjectStorage(context, !is_insert_query);
|
auto storage = getObjectStorage(context, !is_insert_query);
|
||||||
resolveSchemaAndFormat(columns, configuration->format, storage, configuration, std::nullopt, context);
|
std::string sample_path;
|
||||||
|
resolveSchemaAndFormat(columns, configuration->format, storage, configuration, std::nullopt, sample_path, context);
|
||||||
return columns;
|
return columns;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1,39 +0,0 @@
|
|||||||
<clickhouse>
|
|
||||||
<remote_servers>
|
|
||||||
<simple_cluster>
|
|
||||||
<shard>
|
|
||||||
<replica>
|
|
||||||
<host>node_0</host>
|
|
||||||
<port>9000</port>
|
|
||||||
</replica>
|
|
||||||
<replica>
|
|
||||||
<host>node_1</host>
|
|
||||||
<port>9000</port>
|
|
||||||
</replica>
|
|
||||||
<replica>
|
|
||||||
<host>node_2</host>
|
|
||||||
<port>9000</port>
|
|
||||||
</replica>
|
|
||||||
</shard>
|
|
||||||
</simple_cluster>
|
|
||||||
|
|
||||||
<cluster_non_existent_port>
|
|
||||||
<shard>
|
|
||||||
<replica>
|
|
||||||
<host>node_0</host>
|
|
||||||
<port>9000</port>
|
|
||||||
</replica>
|
|
||||||
</shard>
|
|
||||||
<shard>
|
|
||||||
<replica>
|
|
||||||
<host>node_1</host>
|
|
||||||
<port>19000</port>
|
|
||||||
</replica>
|
|
||||||
</shard>
|
|
||||||
</cluster_non_existent_port>
|
|
||||||
|
|
||||||
</remote_servers>
|
|
||||||
<macros>
|
|
||||||
<default_cluster_macro>simple_cluster</default_cluster_macro>
|
|
||||||
</macros>
|
|
||||||
</clickhouse>
|
|
@ -1,33 +0,0 @@
|
|||||||
<clickhouse>
|
|
||||||
<remote_servers>
|
|
||||||
<cluster_non_existent_port>
|
|
||||||
<shard>
|
|
||||||
<replica>
|
|
||||||
<host>node1</host>
|
|
||||||
<port>9000</port>
|
|
||||||
</replica>
|
|
||||||
</shard>
|
|
||||||
<shard>
|
|
||||||
<replica>
|
|
||||||
<host>node1</host>
|
|
||||||
<port>19000</port>
|
|
||||||
</replica>
|
|
||||||
</shard>
|
|
||||||
</cluster_non_existent_port>
|
|
||||||
|
|
||||||
<test_cluster_two_shards>
|
|
||||||
<shard>
|
|
||||||
<replica>
|
|
||||||
<host>127.0.0.1</host>
|
|
||||||
<port>9000</port>
|
|
||||||
</replica>
|
|
||||||
</shard>
|
|
||||||
<shard>
|
|
||||||
<replica>
|
|
||||||
<host>127.0.0.2</host>
|
|
||||||
<port>9000</port>
|
|
||||||
</replica>
|
|
||||||
</shard>
|
|
||||||
</test_cluster_two_shards>
|
|
||||||
</remote_servers>
|
|
||||||
</clickhouse>
|
|
@ -1,9 +0,0 @@
|
|||||||
<!-- Sometime azurite is super slow, profiler make it even worse -->
|
|
||||||
<clickhouse>
|
|
||||||
<profiles>
|
|
||||||
<default>
|
|
||||||
<query_profiler_real_time_period_ns>0</query_profiler_real_time_period_ns>
|
|
||||||
<query_profiler_cpu_time_period_ns>0</query_profiler_cpu_time_period_ns>
|
|
||||||
</default>
|
|
||||||
</profiles>
|
|
||||||
</clickhouse>
|
|
@ -1,5 +0,0 @@
|
|||||||
<clickhouse>
|
|
||||||
<macros>
|
|
||||||
<default_cluster_macro>test_cluster_two_shards</default_cluster_macro>
|
|
||||||
</macros>
|
|
||||||
</clickhouse>
|
|
@ -1,14 +0,0 @@
|
|||||||
<clickhouse>
|
|
||||||
<named_collections>
|
|
||||||
<azure_conf1>
|
|
||||||
<container>cont</container>
|
|
||||||
<blob_path>test_simple_write_named.csv</blob_path>
|
|
||||||
<structure>key UInt64, data String</structure>
|
|
||||||
<format>CSV</format>
|
|
||||||
</azure_conf1>
|
|
||||||
<azure_conf2>
|
|
||||||
<account_name>devstoreaccount1</account_name>
|
|
||||||
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
|
|
||||||
</azure_conf2>
|
|
||||||
</named_collections>
|
|
||||||
</clickhouse>
|
|
@ -1,3 +0,0 @@
|
|||||||
<clickhouse>
|
|
||||||
<schema_inference_cache_max_elements_for_azure>2</schema_inference_cache_max_elements_for_azure>
|
|
||||||
</clickhouse>
|
|
@ -1,3 +0,0 @@
|
|||||||
<clickhouse>
|
|
||||||
<schema_inference_cache_max_elements_for_hdfs>2</schema_inference_cache_max_elements_for_hdfs>
|
|
||||||
</clickhouse>
|
|
@ -1,9 +0,0 @@
|
|||||||
<clickhouse>
|
|
||||||
<users>
|
|
||||||
<default>
|
|
||||||
<password></password>
|
|
||||||
<profile>default</profile>
|
|
||||||
<named_collection_control>1</named_collection_control>
|
|
||||||
</default>
|
|
||||||
</users>
|
|
||||||
</clickhouse>
|
|
@ -1,219 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
import time
|
|
||||||
|
|
||||||
from helpers.cluster import ClickHouseCluster, is_arm
|
|
||||||
import re
|
|
||||||
|
|
||||||
from azure.storage.blob import BlobServiceClient
|
|
||||||
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
|
|
||||||
|
|
||||||
if is_arm():
|
|
||||||
pytestmark = pytest.mark.skip
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
|
||||||
def cluster():
|
|
||||||
try:
|
|
||||||
cluster = ClickHouseCluster(__file__)
|
|
||||||
cluster.add_instance(
|
|
||||||
"node",
|
|
||||||
main_configs=[
|
|
||||||
"configs/named_collections_azure.xml",
|
|
||||||
"configs/schema_cache_azure.xml",
|
|
||||||
],
|
|
||||||
user_configs=[
|
|
||||||
"configs/disable_profilers_azure.xml",
|
|
||||||
"configs/users_azure.xml",
|
|
||||||
],
|
|
||||||
with_azurite=True,
|
|
||||||
)
|
|
||||||
cluster.start()
|
|
||||||
container_client = cluster.blob_service_client.get_container_client("cont")
|
|
||||||
container_client.create_container()
|
|
||||||
yield cluster
|
|
||||||
finally:
|
|
||||||
cluster.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
def azure_query(
|
|
||||||
node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None
|
|
||||||
):
|
|
||||||
for i in range(try_num):
|
|
||||||
try:
|
|
||||||
if expect_error:
|
|
||||||
return node.query_and_get_error(query, settings=settings)
|
|
||||||
else:
|
|
||||||
return node.query(query, settings=settings)
|
|
||||||
except Exception as ex:
|
|
||||||
retriable_errors = [
|
|
||||||
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
|
|
||||||
"DB::Exception: Azure::Core::Http::TransportException: Connection closed before getting full response or response is less than expected",
|
|
||||||
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
|
|
||||||
"DB::Exception: Azure::Core::Http::TransportException: Error while polling for socket ready read",
|
|
||||||
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
|
|
||||||
"Azure::Core::Http::TransportException, e.what() = Connection closed before getting full response or response is less than expected",
|
|
||||||
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
|
|
||||||
"Azure::Core::Http::TransportException, e.what() = Error while polling for socket ready read",
|
|
||||||
]
|
|
||||||
retry = False
|
|
||||||
for error in retriable_errors:
|
|
||||||
if error in str(ex):
|
|
||||||
retry = True
|
|
||||||
print(f"Try num: {i}. Having retriable error: {ex}")
|
|
||||||
time.sleep(i)
|
|
||||||
break
|
|
||||||
if not retry or i == try_num - 1:
|
|
||||||
raise Exception(ex)
|
|
||||||
if query_on_retry is not None:
|
|
||||||
node.query(query_on_retry)
|
|
||||||
continue
|
|
||||||
|
|
||||||
|
|
||||||
def get_azure_file_content(filename, port):
|
|
||||||
container_name = "cont"
|
|
||||||
connection_string = (
|
|
||||||
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
|
|
||||||
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
|
|
||||||
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
|
|
||||||
)
|
|
||||||
blob_service_client = BlobServiceClient.from_connection_string(
|
|
||||||
str(connection_string)
|
|
||||||
)
|
|
||||||
container_client = blob_service_client.get_container_client(container_name)
|
|
||||||
blob_client = container_client.get_blob_client(filename)
|
|
||||||
download_stream = blob_client.download_blob()
|
|
||||||
return download_stream.readall().decode("utf-8")
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True, scope="function")
|
|
||||||
def delete_all_files(cluster):
|
|
||||||
port = cluster.env_variables["AZURITE_PORT"]
|
|
||||||
connection_string = (
|
|
||||||
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
|
|
||||||
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
|
|
||||||
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
|
|
||||||
)
|
|
||||||
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
|
|
||||||
containers = blob_service_client.list_containers()
|
|
||||||
for container in containers:
|
|
||||||
container_client = blob_service_client.get_container_client(container)
|
|
||||||
blob_list = container_client.list_blobs()
|
|
||||||
for blob in blob_list:
|
|
||||||
print(blob)
|
|
||||||
blob_client = container_client.get_blob_client(blob)
|
|
||||||
blob_client.delete_blob()
|
|
||||||
|
|
||||||
assert len(list(container_client.list_blobs())) == 0
|
|
||||||
|
|
||||||
yield
|
|
||||||
|
|
||||||
|
|
||||||
def test_azure_partitioning_with_one_parameter(cluster):
|
|
||||||
# type: (ClickHouseCluster) -> None
|
|
||||||
node = cluster.instances["node"] # type: ClickHouseInstance
|
|
||||||
table_format = "column1 String, column2 String"
|
|
||||||
values = f"('Elizabeth', 'Gordon')"
|
|
||||||
path = "a/column1=Elizabeth/sample.csv"
|
|
||||||
|
|
||||||
azure_query(
|
|
||||||
node,
|
|
||||||
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
|
|
||||||
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}",
|
|
||||||
)
|
|
||||||
|
|
||||||
query = (
|
|
||||||
f"SELECT column1, column2, _file, _path, _column1 FROM azureBlobStorage(azure_conf2, "
|
|
||||||
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
|
||||||
f"blob_path='{path}', format='CSV', structure='{table_format}')"
|
|
||||||
)
|
|
||||||
assert azure_query(
|
|
||||||
node, query, settings={"azure_blob_storage_hive_partitioning": 1}
|
|
||||||
).splitlines() == [
|
|
||||||
"Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth".format(
|
|
||||||
bucket="cont", max_path=path
|
|
||||||
)
|
|
||||||
]
|
|
||||||
|
|
||||||
query = (
|
|
||||||
f"SELECT column2 FROM azureBlobStorage(azure_conf2, "
|
|
||||||
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
|
||||||
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
|
|
||||||
)
|
|
||||||
assert azure_query(
|
|
||||||
node, query, settings={"azure_blob_storage_hive_partitioning": 1}
|
|
||||||
).splitlines() == ["Gordon"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_azure_partitioning_with_two_parameters(cluster):
|
|
||||||
# type: (ClickHouseCluster) -> None
|
|
||||||
node = cluster.instances["node"] # type: ClickHouseInstance
|
|
||||||
table_format = "column1 String, column2 String"
|
|
||||||
values_1 = f"('Elizabeth', 'Gordon')"
|
|
||||||
values_2 = f"('Emilia', 'Gregor')"
|
|
||||||
path = "a/column1=Elizabeth/column2=Gordon/sample.csv"
|
|
||||||
|
|
||||||
azure_query(
|
|
||||||
node,
|
|
||||||
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
|
|
||||||
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}",
|
|
||||||
)
|
|
||||||
|
|
||||||
query = (
|
|
||||||
f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, "
|
|
||||||
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
|
||||||
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
|
|
||||||
)
|
|
||||||
assert azure_query(
|
|
||||||
node, query, settings={"azure_blob_storage_hive_partitioning": 1}
|
|
||||||
).splitlines() == [
|
|
||||||
"Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth\tGordon".format(
|
|
||||||
bucket="cont", max_path=path
|
|
||||||
)
|
|
||||||
]
|
|
||||||
|
|
||||||
query = (
|
|
||||||
f"SELECT column1 FROM azureBlobStorage(azure_conf2, "
|
|
||||||
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
|
||||||
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2;"
|
|
||||||
)
|
|
||||||
assert azure_query(
|
|
||||||
node, query, settings={"azure_blob_storage_hive_partitioning": 1}
|
|
||||||
).splitlines() == ["Elizabeth"]
|
|
||||||
|
|
||||||
query = (
|
|
||||||
f"SELECT column1 FROM azureBlobStorage(azure_conf2, "
|
|
||||||
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
|
||||||
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2 AND column1=_column1;"
|
|
||||||
)
|
|
||||||
assert azure_query(
|
|
||||||
node, query, settings={"azure_blob_storage_hive_partitioning": 1}
|
|
||||||
).splitlines() == ["Elizabeth"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_azure_partitioning_without_setting(cluster):
|
|
||||||
# type: (ClickHouseCluster) -> None
|
|
||||||
node = cluster.instances["node"] # type: ClickHouseInstance
|
|
||||||
table_format = "column1 String, column2 String"
|
|
||||||
values_1 = f"('Elizabeth', 'Gordon')"
|
|
||||||
values_2 = f"('Emilia', 'Gregor')"
|
|
||||||
path = "a/column1=Elizabeth/column2=Gordon/sample.csv"
|
|
||||||
|
|
||||||
azure_query(
|
|
||||||
node,
|
|
||||||
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
|
|
||||||
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}",
|
|
||||||
)
|
|
||||||
|
|
||||||
query = (
|
|
||||||
f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, "
|
|
||||||
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
|
||||||
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
|
|
||||||
)
|
|
||||||
pattern = re.compile(
|
|
||||||
r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL
|
|
||||||
)
|
|
||||||
|
|
||||||
with pytest.raises(Exception, match=pattern):
|
|
||||||
azure_query(node, query, settings={"azure_blob_storage_hive_partitioning": 0})
|
|
@ -1,87 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from helpers.client import QueryRuntimeException
|
|
||||||
from helpers.cluster import ClickHouseCluster, is_arm
|
|
||||||
import re
|
|
||||||
|
|
||||||
from helpers.cluster import ClickHouseCluster
|
|
||||||
|
|
||||||
if is_arm():
|
|
||||||
pytestmark = pytest.mark.skip
|
|
||||||
|
|
||||||
cluster = ClickHouseCluster(__file__)
|
|
||||||
node1 = cluster.add_instance(
|
|
||||||
"node1",
|
|
||||||
main_configs=[
|
|
||||||
"configs/macro_hdfs.xml",
|
|
||||||
"configs/schema_cache_hdfs.xml",
|
|
||||||
"configs/cluster_hdfs.xml",
|
|
||||||
],
|
|
||||||
with_hdfs=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
|
||||||
def started_cluster():
|
|
||||||
try:
|
|
||||||
cluster.start()
|
|
||||||
yield cluster
|
|
||||||
finally:
|
|
||||||
cluster.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
def test_hdfs_partitioning_with_one_parameter(started_cluster):
|
|
||||||
hdfs_api = started_cluster.hdfs_api
|
|
||||||
hdfs_api.write_data(f"/column0=Elizabeth/parquet_1", f"Elizabeth\tGordon\n")
|
|
||||||
assert hdfs_api.read_data(f"/column0=Elizabeth/parquet_1") == f"Elizabeth\tGordon\n"
|
|
||||||
|
|
||||||
r = node1.query(
|
|
||||||
"SELECT _column0 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/parquet_1', 'TSV')",
|
|
||||||
settings={"hdfs_hive_partitioning": 1},
|
|
||||||
)
|
|
||||||
assert r == f"Elizabeth\n"
|
|
||||||
|
|
||||||
|
|
||||||
def test_hdfs_partitioning_with_two_parameters(started_cluster):
|
|
||||||
hdfs_api = started_cluster.hdfs_api
|
|
||||||
hdfs_api.write_data(
|
|
||||||
f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n"
|
|
||||||
)
|
|
||||||
assert (
|
|
||||||
hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2")
|
|
||||||
== f"Elizabeth\tGordon\n"
|
|
||||||
)
|
|
||||||
|
|
||||||
r = node1.query(
|
|
||||||
"SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');",
|
|
||||||
settings={"hdfs_hive_partitioning": 1},
|
|
||||||
)
|
|
||||||
assert r == f"Gordon\n"
|
|
||||||
|
|
||||||
|
|
||||||
def test_hdfs_partitioning_without_setting(started_cluster):
|
|
||||||
hdfs_api = started_cluster.hdfs_api
|
|
||||||
hdfs_api.write_data(
|
|
||||||
f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n"
|
|
||||||
)
|
|
||||||
assert (
|
|
||||||
hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2")
|
|
||||||
== f"Elizabeth\tGordon\n"
|
|
||||||
)
|
|
||||||
pattern = re.compile(
|
|
||||||
r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL
|
|
||||||
)
|
|
||||||
|
|
||||||
with pytest.raises(QueryRuntimeException, match=pattern):
|
|
||||||
node1.query(
|
|
||||||
f"SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');",
|
|
||||||
settings={"hdfs_hive_partitioning": 0},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
cluster.start()
|
|
||||||
input("Cluster created, press any key to destroy...")
|
|
||||||
cluster.shutdown()
|
|
@ -5,6 +5,7 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import io
|
import io
|
||||||
|
import re
|
||||||
import random
|
import random
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
@ -1462,3 +1463,112 @@ def test_insert_create_new_file(cluster):
|
|||||||
assert TSV(res) == TSV(
|
assert TSV(res) == TSV(
|
||||||
"test_create_new_file.csv\t1\ntest_create_new_file.1.csv\t2\n"
|
"test_create_new_file.csv\t1\ntest_create_new_file.1.csv\t2\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_hive_partitioning_with_one_parameter(cluster):
|
||||||
|
# type: (ClickHouseCluster) -> None
|
||||||
|
node = cluster.instances["node"] # type: ClickHouseInstance
|
||||||
|
table_format = "column1 String, column2 String"
|
||||||
|
values = f"('Elizabeth', 'Gordon')"
|
||||||
|
path = "a/column1=Elizabeth/sample.csv"
|
||||||
|
|
||||||
|
azure_query(
|
||||||
|
node,
|
||||||
|
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
|
||||||
|
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}",
|
||||||
|
)
|
||||||
|
|
||||||
|
query = (
|
||||||
|
f"SELECT column1, column2, _file, _path, _column1 FROM azureBlobStorage(azure_conf2, "
|
||||||
|
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
||||||
|
f"blob_path='{path}', format='CSV', structure='{table_format}')"
|
||||||
|
)
|
||||||
|
assert azure_query(
|
||||||
|
node, query, settings={"azure_blob_storage_hive_partitioning": 1}
|
||||||
|
).splitlines() == [
|
||||||
|
"Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth".format(
|
||||||
|
bucket="cont", max_path=path
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
query = (
|
||||||
|
f"SELECT column2 FROM azureBlobStorage(azure_conf2, "
|
||||||
|
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
||||||
|
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
|
||||||
|
)
|
||||||
|
assert azure_query(
|
||||||
|
node, query, settings={"azure_blob_storage_hive_partitioning": 1}
|
||||||
|
).splitlines() == ["Gordon"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_hive_partitioning_with_two_parameters(cluster):
|
||||||
|
# type: (ClickHouseCluster) -> None
|
||||||
|
node = cluster.instances["node"] # type: ClickHouseInstance
|
||||||
|
table_format = "column1 String, column2 String"
|
||||||
|
values_1 = f"('Elizabeth', 'Gordon')"
|
||||||
|
values_2 = f"('Emilia', 'Gregor')"
|
||||||
|
path = "a/column1=Elizabeth/column2=Gordon/sample.csv"
|
||||||
|
|
||||||
|
azure_query(
|
||||||
|
node,
|
||||||
|
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
|
||||||
|
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}",
|
||||||
|
)
|
||||||
|
|
||||||
|
query = (
|
||||||
|
f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, "
|
||||||
|
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
||||||
|
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
|
||||||
|
)
|
||||||
|
assert azure_query(
|
||||||
|
node, query, settings={"azure_blob_storage_hive_partitioning": 1}
|
||||||
|
).splitlines() == [
|
||||||
|
"Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth\tGordon".format(
|
||||||
|
bucket="cont", max_path=path
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
query = (
|
||||||
|
f"SELECT column1 FROM azureBlobStorage(azure_conf2, "
|
||||||
|
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
||||||
|
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2;"
|
||||||
|
)
|
||||||
|
assert azure_query(
|
||||||
|
node, query, settings={"azure_blob_storage_hive_partitioning": 1}
|
||||||
|
).splitlines() == ["Elizabeth"]
|
||||||
|
|
||||||
|
query = (
|
||||||
|
f"SELECT column1 FROM azureBlobStorage(azure_conf2, "
|
||||||
|
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
||||||
|
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2 AND column1=_column1;"
|
||||||
|
)
|
||||||
|
assert azure_query(
|
||||||
|
node, query, settings={"azure_blob_storage_hive_partitioning": 1}
|
||||||
|
).splitlines() == ["Elizabeth"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_hive_partitioning_without_setting(cluster):
|
||||||
|
# type: (ClickHouseCluster) -> None
|
||||||
|
node = cluster.instances["node"] # type: ClickHouseInstance
|
||||||
|
table_format = "column1 String, column2 String"
|
||||||
|
values_1 = f"('Elizabeth', 'Gordon')"
|
||||||
|
values_2 = f"('Emilia', 'Gregor')"
|
||||||
|
path = "a/column1=Elizabeth/column2=Gordon/sample.csv"
|
||||||
|
|
||||||
|
azure_query(
|
||||||
|
node,
|
||||||
|
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
|
||||||
|
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}",
|
||||||
|
)
|
||||||
|
|
||||||
|
query = (
|
||||||
|
f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, "
|
||||||
|
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
|
||||||
|
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
|
||||||
|
)
|
||||||
|
pattern = re.compile(
|
||||||
|
r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(Exception, match=pattern):
|
||||||
|
azure_query(node, query, settings={"azure_blob_storage_hive_partitioning": 0})
|
||||||
|
@ -3,6 +3,7 @@ import os
|
|||||||
import pytest
|
import pytest
|
||||||
import time
|
import time
|
||||||
from helpers.cluster import ClickHouseCluster, is_arm
|
from helpers.cluster import ClickHouseCluster, is_arm
|
||||||
|
from helpers.client import QueryRuntimeException
|
||||||
from helpers.test_tools import TSV
|
from helpers.test_tools import TSV
|
||||||
from pyhdfs import HdfsClient
|
from pyhdfs import HdfsClient
|
||||||
|
|
||||||
@ -1180,6 +1181,54 @@ def test_respect_object_existence_on_partitioned_write(started_cluster):
|
|||||||
assert int(result) == 44
|
assert int(result) == 44
|
||||||
|
|
||||||
|
|
||||||
|
def test_hive_partitioning_with_one_parameter(started_cluster):
|
||||||
|
hdfs_api = started_cluster.hdfs_api
|
||||||
|
hdfs_api.write_data(f"/column0=Elizabeth/parquet_1", f"Elizabeth\tGordon\n")
|
||||||
|
assert hdfs_api.read_data(f"/column0=Elizabeth/parquet_1") == f"Elizabeth\tGordon\n"
|
||||||
|
|
||||||
|
r = node1.query(
|
||||||
|
"SELECT _column0 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/parquet_1', 'TSV')",
|
||||||
|
settings={"hdfs_hive_partitioning": 1},
|
||||||
|
)
|
||||||
|
assert r == f"Elizabeth\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_hive_partitioning_with_two_parameters(started_cluster):
|
||||||
|
hdfs_api = started_cluster.hdfs_api
|
||||||
|
hdfs_api.write_data(
|
||||||
|
f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n"
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2")
|
||||||
|
== f"Elizabeth\tGordon\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
r = node1.query(
|
||||||
|
"SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');",
|
||||||
|
settings={"hdfs_hive_partitioning": 1},
|
||||||
|
)
|
||||||
|
assert r == f"Gordon\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_hive_partitioning_without_setting(started_cluster):
|
||||||
|
hdfs_api = started_cluster.hdfs_api
|
||||||
|
hdfs_api.write_data(
|
||||||
|
f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n"
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2")
|
||||||
|
== f"Elizabeth\tGordon\n"
|
||||||
|
)
|
||||||
|
pattern = re.compile(
|
||||||
|
r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(QueryRuntimeException, match=pattern):
|
||||||
|
node1.query(
|
||||||
|
f"SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');",
|
||||||
|
settings={"hdfs_hive_partitioning": 0},
|
||||||
|
)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
cluster.start()
|
cluster.start()
|
||||||
input("Cluster created, press any key to destroy...")
|
input("Cluster created, press any key to destroy...")
|
||||||
|
Loading…
Reference in New Issue
Block a user