Improve schema inference

This commit is contained in:
Antonio Andelic 2023-08-28 13:10:13 +00:00
parent 5a0c2ca108
commit 9b99f25d75
11 changed files with 220 additions and 117 deletions

View File

@ -564,7 +564,6 @@ Pipe ShellCommandSourceCoordinator::createPipe(
ContextPtr context,
const ShellCommandSourceConfiguration & source_configuration)
{
std::cout << "Creating pipe " << std::endl;
ShellCommand::Config command_config(command);
command_config.arguments = arguments;
for (size_t i = 1; i < input_pipes.size(); ++i)

View File

@ -443,86 +443,6 @@ Strings StorageFile::getPathsList(const String & table_path, const String & user
namespace
{
/**
read_buffer_iterator = [&,
read_files = std::unordered_set<std::string>(),
archive_it = archive_info->paths_to_archive.begin(),
first = true](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
std::unique_ptr<ReadBuffer> read_buf;
struct stat file_stat;
while (true)
{
if (archive_it == archive_info->paths_to_archive.end())
{
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually",
format);
return nullptr;
}
file_stat = getFileStat(*archive_it, false, -1, "File");
if (file_stat.st_size == 0)
{
if (context->getSettingsRef().engine_file_skip_empty_files)
{
++archive_it;
continue;
}
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because the archive {} is empty. "
"You must specify table structure manually",
format,
*archive_it);
}
auto archive_reader = createArchiveReader(*archive_it);
if (archive_info->filter)
{
auto file_enumerator = archive_reader->firstFile();
while (true)
{
if (!file_enumerator)
{
++archive_it;
read_files.clear();
break;
}
const auto & filename = file_enumerator->getFileName();
if (read_files.contains(filename) || !archive_info->filter(filename))
{
file_enumerator->nextFile();
continue;
}
read_files.insert(filename);
read_buf = archive_reader->readFile(std::move(file_enumerator));
break;
}
if (!read_buf)
continue;
}
else
{
read_buf = archive_reader->readFile(archive_info->path_in_archive, false);
++archive_it;
}
break;
}
first = false;
return read_buf;
};
*/
struct ReadBufferFromFileIterator : public IReadBufferIterator, WithContext
{
public:
@ -589,10 +509,12 @@ namespace
ReadBufferFromArchiveIterator(
const StorageFile::ArchiveInfo & archive_info_,
const String & format_,
const std::optional<FormatSettings> & format_settings_,
ContextPtr context_)
: WithContext(context_)
, archive_info(archive_info_)
, format(format_)
, format_settings(format_settings_)
{
}
@ -602,7 +524,7 @@ namespace
struct stat file_stat;
while (true)
{
if (current_archive_index == archive_info.paths_to_archive.size())
if (current_archive_index == archive_info.paths_to_archives.size())
{
if (is_first)
throw Exception(
@ -613,7 +535,7 @@ namespace
return nullptr;
}
const auto & archive = archive_info.paths_to_archive[current_archive_index];
const auto & archive = archive_info.paths_to_archives[current_archive_index];
file_stat = getFileStat(archive, false, -1, "File");
if (file_stat.st_size == 0)
{
@ -630,10 +552,42 @@ namespace
auto archive_reader = createArchiveReader(archive);
auto check_schema_cache = [&](const std::string & full_path) -> std::optional<ColumnsDescription>
{
auto context = getContext();
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_file)
return std::nullopt;
auto & schema_cache = StorageFile::getSchemaCache(context);
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(archive_reader->getPath().c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtime;
};
auto cache_key = getKeyForSchemaCache(full_path, format, format_settings, context);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
return std::nullopt;
};
if (archive_info.readSingleFile())
{
read_buf = archive_reader->readFile(archive_info.path_in_archive, false);
++current_archive_index;
if (!read_buf)
continue;
const auto & full_path = processed_files.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), archive_info.path_in_archive));
columns_from_cache = check_schema_cache(full_path);
if (columns_from_cache)
return nullptr;
}
else
{
@ -655,14 +609,16 @@ namespace
archive);
}
const auto & filename = file_enumerator->getFileName();
while (!read_files.contains(filename) || !archive_info.filter(file_enumerator->getFileName()))
const auto * filename = &file_enumerator->getFileName();
while (read_files.contains(*filename) || !archive_info.filter(*filename))
{
if (!file_enumerator->nextFile())
{
archive_reader = nullptr;
break;
}
filename = &file_enumerator->getFileName();
}
if (!archive_reader)
@ -670,9 +626,15 @@ namespace
++current_archive_index;
continue;
}
read_files.insert(filename);
read_files.insert(*filename);
read_buf = archive_reader->readFile(std::move(file_enumerator));
const auto & full_path = processed_files.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), *filename));
columns_from_cache = check_schema_cache(full_path);
if (columns_from_cache)
return nullptr;
}
break;
@ -686,16 +648,18 @@ namespace
{
}
std::vector<std::string> processed_files;
std::optional<ColumnsDescription> columns_from_cache;
private:
const StorageFile::ArchiveInfo & archive_info;
size_t current_archive_index = 0;
std::unordered_map<size_t, std::unordered_set<std::string>> read_files_from_archive;
bool is_first = true;
String format;
const std::optional<FormatSettings> & format_settings;
};
}
@ -742,7 +706,7 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return ColumnsDescription(DistributedAsyncInsertSource(paths[0]).getOutputs().front().getHeader().getNamesAndTypesList());
}
if (((archive_info && archive_info->paths_to_archive.empty()) || (!archive_info && paths.empty()))
if (((archive_info && archive_info->paths_to_archives.empty()) || (!archive_info && paths.empty()))
&& !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format))
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
@ -752,16 +716,72 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
ColumnsDescription columns;
if (archive_info)
{
ReadBufferFromArchiveIterator read_buffer_iterator(*archive_info, format, context);
columns = readSchemaFromFormat(
format,
format_settings,
read_buffer_iterator,
/*retry=*/archive_info->paths_to_archive.size() > 1 || !archive_info->readSingleFile(),
context);
std::vector<std::string> paths_for_schema_cache;
std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().schema_inference_use_cache_for_file)
addColumnsToCache(archive_info ? archive_info->paths_to_archive : paths, columns, format, format_settings, context);
{
paths_for_schema_cache.reserve(archive_info->paths_to_archives.size());
struct stat file_stat{};
for (const auto & archive : archive_info->paths_to_archives)
{
const auto & full_path = paths_for_schema_cache.emplace_back(fmt::format("{}::{}", archive, archive_info->path_in_archive));
if (!columns_from_cache)
{
auto & schema_cache = getSchemaCache(context);
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(archive.c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtime;
};
auto cache_key = getKeyForSchemaCache(full_path, format, format_settings, context);
columns_from_cache = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
}
}
}
if (columns_from_cache)
{
columns = std::move(*columns_from_cache);
}
else
{
ReadBufferFromArchiveIterator read_buffer_iterator(*archive_info, format, format_settings, context);
try
{
columns = readSchemaFromFormat(
format,
format_settings,
read_buffer_iterator,
/*retry=*/archive_info->paths_to_archives.size() > 1 || !archive_info->readSingleFile(),
context);
}
catch (const DB::Exception & e)
{
/// maybe we found something in cache while iterating files
if (e.code() == ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE)
{
if (read_buffer_iterator.columns_from_cache)
columns = std::move(*read_buffer_iterator.columns_from_cache);
else
throw;
}
else
{
throw;
}
}
for (auto & file : read_buffer_iterator.processed_files)
paths_for_schema_cache.push_back(std::move(file));
}
if (context->getSettingsRef().schema_inference_use_cache_for_file)
addColumnsToCache(paths_for_schema_cache, columns, format, format_settings, context);
}
else
{
@ -780,7 +800,7 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
}
if (context->getSettingsRef().schema_inference_use_cache_for_file)
addColumnsToCache(archive_info ? archive_info->paths_to_archive : paths, columns, format, format_settings, context);
addColumnsToCache(archive_info ? archive_info->paths_to_archives : paths, columns, format, format_settings, context);
}
return columns;
@ -942,7 +962,7 @@ public:
String next()
{
const auto & fs = readFromArchive() ? archive_info->paths_to_archive : files;
const auto & fs = readFromArchive() ? archive_info->paths_to_archives : files;
auto current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= fs.size())
@ -1104,7 +1124,7 @@ public:
archive_reader = createArchiveReader(archive);
filename = files_iterator->getFileNameInArchive();
read_buf = archive_reader->readFile(current_path, /*throw_on_not_found=*/false);
read_buf = archive_reader->readFile(filename, /*throw_on_not_found=*/false);
if (!read_buf)
continue;
}
@ -1210,7 +1230,7 @@ public:
size_t file_num = 0;
if (storage->archive_info)
file_num = storage->archive_info->paths_to_archive.size();
file_num = storage->archive_info->paths_to_archives.size();
else
file_num = storage->paths.size();
@ -1351,7 +1371,7 @@ Pipe StorageFile::read(
const std::string * p;
if (archive_info.has_value())
p = archive_info->paths_to_archive.data();
p = archive_info->paths_to_archives.data();
else
p = paths.data();
@ -1373,7 +1393,7 @@ Pipe StorageFile::read(
size_t files_to_read = 0;
if (archive_info)
files_to_read = archive_info->paths_to_archive.size();
files_to_read = archive_info->paths_to_archives.size();
else
files_to_read = paths.size();
@ -2010,6 +2030,7 @@ StorageFile::ArchiveInfo StorageFile::getArchiveInfo(
)
{
ArchiveInfo archive_info;
archive_info.path_in_archive = file_in_archive;
if (file_in_archive.find_first_of("*?{") != std::string::npos)
{
@ -2024,12 +2045,8 @@ StorageFile::ArchiveInfo StorageFile::getArchiveInfo(
return re2::RE2::FullMatch(p, *matcher);
};
}
else
{
archive_info.path_in_archive = file_in_archive;
}
archive_info.paths_to_archive = getPathsList(path_to_archive, user_files_path, context, total_bytes_to_read);
archive_info.paths_to_archives = getPathsList(path_to_archive, user_files_path, context, total_bytes_to_read);
return archive_info;
}

View File

@ -86,13 +86,13 @@ public:
struct ArchiveInfo
{
std::vector<std::string> paths_to_archive;
std::vector<std::string> paths_to_archives;
std::string path_in_archive; // used when reading a single file from archive
IArchiveReader::NameFilter filter; // used when files inside archive are defined with a glob
IArchiveReader::NameFilter filter = {}; // used when files inside archive are defined with a glob
bool readSingleFile() const
{
return !path_in_archive.empty();
return !filter;
}
};

View File

@ -8,6 +8,7 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
function read_archive_file() {
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/$1') ORDER BY 1, 2"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file('${user_files_path}/$1') ORDER BY 1, 2"
$CLICKHOUSE_CLIENT --query "DESC file('${user_files_path}/$1')"
$CLICKHOUSE_CLIENT --query "CREATE TABLE 02661_archive_table Engine=File('CSV', '${user_files_path}/$1')"
$CLICKHOUSE_CLIENT --query "SELECT * FROM 02661_archive_table ORDER BY 1, 2"
$CLICKHOUSE_CLIENT --query "DROP TABLE 02661_archive_table"
@ -16,15 +17,17 @@ function read_archive_file() {
function run_archive_test() {
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS 02661_archive_table"
FILE_PREFIX="${CLICKHOUSE_TEST_UNIQUE_NAME}_$1_"
extension_without_dot=$(echo $1 | sed -e 's/\.//g')
FILE_PREFIX="02661_read_from_archive_${CLICKHOUSE_DATABASE}_$extension_without_dot"
user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
user_files_path=$(clickhouse-client --query "select _path, _file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep -o "/[^[:space:]]*nonexist.txt" | awk '{gsub("/nonexist.txt","",$1); print $1}')
touch ${FILE_PREFIX}_data0.csv
echo -e "1,2\n3,4" > ${FILE_PREFIX}_data1.csv
echo -e "5,6\n7,8" > ${FILE_PREFIX}_data2.csv
echo -e "9,10\n11,12" > ${FILE_PREFIX}_data3.csv
eval "$2 ${user_files_path}/${FILE_PREFIX}_archive1.$1 ${FILE_PREFIX}_data1.csv ${FILE_PREFIX}_data2.csv > /dev/null"
eval "$2 ${user_files_path}/${FILE_PREFIX}_archive1.$1 ${FILE_PREFIX}_data0.csv ${FILE_PREFIX}_data1.csv ${FILE_PREFIX}_data2.csv > /dev/null"
eval "$2 ${user_files_path}/${FILE_PREFIX}_archive2.$1 ${FILE_PREFIX}_data1.csv ${FILE_PREFIX}_data3.csv > /dev/null"
eval "$2 ${user_files_path}/${FILE_PREFIX}_archive3.$1 ${FILE_PREFIX}_data2.csv ${FILE_PREFIX}_data3.csv > /dev/null"
@ -41,10 +44,10 @@ function run_archive_test() {
echo "archive* {2..3}.csv"
read_archive_file "${FILE_PREFIX}_archive*.$1 :: ${FILE_PREFIX}_data{2..3}.csv"
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/${FILE_PREFIX}_archive1.$1::nonexistent.csv')" 2>&1 | grep -q "CANNOT_UNPACK_ARCHIVE" && echo "OK" || echo "FAIL"
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/${FILE_PREFIX}_archive3.$1::{2..3}.csv')" 2>&1 | grep -q "CANNOT_UNPACK_ARCHIVE" && echo "OK" || echo "FAIL"
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/${FILE_PREFIX}_archive1.$1::nonexistent.csv')" 2>&1 | grep -q "CANNOT_EXTRACT_TABLE_STRUCTURE" && echo "OK" || echo "FAIL"
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/${FILE_PREFIX}_archive3.$1::{2..3}.csv')" 2>&1 | grep -q "CANNOT_EXTRACT_TABLE_STRUCTURE" && echo "OK" || echo "FAIL"
rm ${user_files_path}/${FILE_PREFIX}_archive{1..3}.$1
rm ${FILE_PREFIX}_data{1..3}.csv
rm ${FILE_PREFIX}_data{0..3}.csv
}

View File

@ -3,6 +3,8 @@ archive1 data1.csv
3 4
1 2
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
3 4
archive{1..2} data1.csv
@ -14,6 +16,8 @@ archive{1..2} data1.csv
1 2
3 4
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -31,6 +35,8 @@ archive{1,2} data{1,3}.csv
3 4
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -46,6 +52,8 @@ archive3 data*.csv
7 8
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
7 8
9 10
@ -75,6 +83,8 @@ archive* *.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -104,6 +114,8 @@ archive* {2..3}.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
5 6
7 8

View File

@ -3,6 +3,8 @@ archive1 data1.csv
3 4
1 2
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
3 4
archive{1..2} data1.csv
@ -14,6 +16,8 @@ archive{1..2} data1.csv
1 2
3 4
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -31,6 +35,8 @@ archive{1,2} data{1,3}.csv
3 4
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -46,6 +52,8 @@ archive3 data*.csv
7 8
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
7 8
9 10
@ -75,6 +83,8 @@ archive* *.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -104,6 +114,8 @@ archive* {2..3}.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
5 6
7 8

View File

@ -3,6 +3,8 @@ archive1 data1.csv
3 4
1 2
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
3 4
archive{1..2} data1.csv
@ -14,6 +16,8 @@ archive{1..2} data1.csv
1 2
3 4
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -31,6 +35,8 @@ archive{1,2} data{1,3}.csv
3 4
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -46,6 +52,8 @@ archive3 data*.csv
7 8
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
7 8
9 10
@ -75,6 +83,8 @@ archive* *.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -104,6 +114,8 @@ archive* {2..3}.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
5 6
7 8

View File

@ -3,6 +3,8 @@ archive1 data1.csv
3 4
1 2
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
3 4
archive{1..2} data1.csv
@ -14,6 +16,8 @@ archive{1..2} data1.csv
1 2
3 4
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -31,6 +35,8 @@ archive{1,2} data{1,3}.csv
3 4
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -46,6 +52,8 @@ archive3 data*.csv
7 8
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
7 8
9 10
@ -75,6 +83,8 @@ archive* *.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -104,6 +114,8 @@ archive* {2..3}.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
5 6
7 8

View File

@ -3,6 +3,8 @@ archive1 data1.csv
3 4
1 2
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
3 4
archive{1..2} data1.csv
@ -14,6 +16,8 @@ archive{1..2} data1.csv
1 2
3 4
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -31,6 +35,8 @@ archive{1,2} data{1,3}.csv
3 4
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -46,6 +52,8 @@ archive3 data*.csv
7 8
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
7 8
9 10
@ -75,6 +83,8 @@ archive* *.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -104,6 +114,8 @@ archive* {2..3}.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
5 6
7 8

View File

@ -3,6 +3,8 @@ archive1 data1.csv
3 4
1 2
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
3 4
archive{1..2} data1.csv
@ -14,6 +16,8 @@ archive{1..2} data1.csv
1 2
3 4
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -31,6 +35,8 @@ archive{1,2} data{1,3}.csv
3 4
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -46,6 +52,8 @@ archive3 data*.csv
7 8
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
7 8
9 10
@ -75,6 +83,8 @@ archive* *.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -104,6 +114,8 @@ archive* {2..3}.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
5 6
7 8

View File

@ -3,6 +3,8 @@ archive1 data1.csv
3 4
1 2
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
3 4
archive{1..2} data1.csv
@ -14,6 +16,8 @@ archive{1..2} data1.csv
1 2
3 4
3 4
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -31,6 +35,8 @@ archive{1,2} data{1,3}.csv
3 4
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -46,6 +52,8 @@ archive3 data*.csv
7 8
9 10
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
7 8
9 10
@ -75,6 +83,8 @@ archive* *.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
1 2
1 2
3 4
@ -104,6 +114,8 @@ archive* {2..3}.csv
9 10
11 12
11 12
c1 Nullable(Int64)
c2 Nullable(Int64)
5 6
5 6
7 8