This commit is contained in:
avogar 2023-08-22 12:55:00 +00:00
parent c0bdd0e00b
commit 7f9e81d504
6 changed files with 43 additions and 18 deletions

View File

@ -1302,18 +1302,14 @@ try
if (structure.empty())
{
ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &)
{
auto file = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
auto file = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
/// stdin must be seekable
auto res = lseek(file->getFD(), 0, SEEK_SET);
if (-1 == res)
throwFromErrno("Input must be seekable file (it will be read twice).", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
return file;
};
/// stdin must be seekable
auto res = lseek(file->getFD(), 0, SEEK_SET);
if (-1 == res)
throwFromErrno("Input must be seekable file (it will be read twice).", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
SingleReadBufferIterator read_buffer_iterator(std::move(file));
schema_columns = readSchemaFromFormat(input_format, {}, read_buffer_iterator, false, context_const);
}
else

View File

@ -659,6 +659,7 @@ class IColumn;
M(SetOperationMode, except_default_mode, SetOperationMode::ALL, "Set default mode in EXCEPT query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without mode will throw exception.", 0) \
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
M(Bool, optimize_group_by_constant_keys, true, "Optimize GROUP BY when all keys in block are constant", 0) \
M(Bool, legacy_column_name_of_tuple_literal, false, "List all names of element of large tuple literals in their column names instead of hash. This settings exists only for compatibility reasons. It makes sense to set to 'true', while doing rolling update of cluster from version lower than 21.7 to higher.", 0) \
\
M(Bool, query_plan_enable_optimizations, true, "Apply optimizations to query plan", 0) \

View File

@ -6,6 +6,8 @@
namespace DB
{
/// Source that generates chunks with constant columns and
/// size up to max_block_size with total rows total_num_rows.
class ConstChunkGenerator : public ISource
{
public:

View File

@ -547,10 +547,20 @@ bool HDFSSource::initialize()
continue;
current_path = path_with_info.path;
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
std::optional<size_t> file_size;
if (!path_with_info.info)
{
auto builder = createHDFSBuilder(uri_without_path + "/", getContext()->getGlobalContext()->getConfigRef());
auto fs = createHDFSFS(builder.get());
auto * hdfs_info = hdfsGetPathInfo(fs.get(), path_from_uri.c_str());
if (hdfs_info)
path_with_info.info = StorageHDFS::PathInfo{hdfs_info->mLastMod, static_cast<size_t>(hdfs_info->mSize)};
}
if (path_with_info.info)
file_size = path_with_info.info->size;
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(
@ -570,6 +580,11 @@ bool HDFSSource::initialize()
std::optional<size_t> num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(path_with_info) : std::nullopt;
if (num_rows_from_cache)
{
/// We should not return single chunk with all number of rows,
/// because there is a chance that this chunk will be materialized later
/// (it can cause memory problems even with default values in columns or when virtual columns are requested).
/// Instead, we use special ConstChunkGenerator that will generate chunks
/// with max_block_size rows until total number of rows is reached.
auto source = std::make_shared<ConstChunkGenerator>(block_for_format, *num_rows_from_cache, max_block_size);
builder.init(Pipe(source));
}

View File

@ -1183,9 +1183,13 @@ StorageAzureBlobSource::ReaderHolder StorageAzureBlobSource::createReader()
std::shared_ptr<ISource> source;
std::unique_ptr<ReadBuffer> read_buf;
std::optional<size_t> num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(path_with_metadata) : std::nullopt;
LOG_DEBUG(&Poco::Logger::get("StorageAzureBlobSource"), "need_only_count: {}", need_only_count);
if (num_rows_from_cache)
{
/// We should not return single chunk with all number of rows,
/// because there is a chance that this chunk will be materialized later
/// (it can cause memory problems even with default values in columns or when virtual columns are requested).
/// Instead, we use special ConstChunkGenerator that will generate chunks
/// with max_block_size rows until total number of rows is reached.
source = std::make_shared<ConstChunkGenerator>(sample_block, *num_rows_from_cache, max_block_size);
builder.init(Pipe(source));
}
@ -1386,13 +1390,15 @@ std::optional<ColumnsDescription> StorageAzureBlob::tryGetColumnsFromCache(
auto & schema_cache = getSchemaCache(ctx);
for (auto it = begin; it < end; ++it)
{
auto get_last_mod_time = [&] -> time_t
auto get_last_mod_time = [&] -> std::optional<time_t>
{
return it->metadata.last_modified->epochTime();
if (it->metadata.last_modified)
return it->metadata.last_modified->epochTime();
return std::nullopt;
};
auto host_and_bucket = fs::path(configuration.connection_url) / configuration.container;
String source = host_and_bucket / it->relative_path;
auto host_and_bucket = configuration.connection_url + '/' + configuration.container;
String source = host_and_bucket + '/' + it->relative_path;
auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, ctx);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
@ -1411,10 +1417,10 @@ void StorageAzureBlob::addColumnsToCache(
const String & format_name,
const ContextPtr & ctx)
{
auto host_and_bucket = fs::path(configuration.connection_url) / configuration.container;
auto host_and_bucket = configuration.connection_url + '/' + configuration.container;
Strings sources;
sources.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return fs::path(host_and_bucket) / elem.relative_path; });
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket + '/' + elem.relative_path; });
auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
auto & schema_cache = getSchemaCache(ctx);
schema_cache.addManyColumns(cache_keys, columns);

View File

@ -580,6 +580,11 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
std::optional<size_t> num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(key_with_info) : std::nullopt;
if (num_rows_from_cache)
{
/// We should not return single chunk with all number of rows,
/// because there is a chance that this chunk will be materialized later
/// (it can cause memory problems even with default values in columns or when virtual columns are requested).
/// Instead, we use special ConstChunkGenerator that will generate chunks
/// with max_block_size rows until total number of rows is reached.
source = std::make_shared<ConstChunkGenerator>(sample_block, *num_rows_from_cache, max_block_size);
builder.init(Pipe(source));
}