Fix remaining integration test

This commit is contained in:
kssenii 2024-04-26 13:35:18 +02:00
parent 193ff63f87
commit 18e4c0f1da
6 changed files with 18 additions and 16 deletions

View File

@ -447,7 +447,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
ObjectMetadata result;
result.size_bytes = object_info.size;
result.last_modified = object_info.last_modification_time;
result.last_modified = Poco::Timestamp::fromEpochTime(object_info.last_modification_time);
result.attributes = object_info.metadata;
return result;
@ -462,7 +462,7 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons
ObjectMetadata result;
result.size_bytes = object_info.size;
result.last_modified = object_info.last_modification_time;
result.last_modified = Poco::Timestamp::fromEpochTime(object_info.last_modification_time);
result.attributes = object_info.metadata;
return result;

View File

@ -53,7 +53,7 @@ namespace
const auto & result = outcome.GetResult();
ObjectInfo object_info;
object_info.size = static_cast<size_t>(result.GetContentLength());
object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
object_info.last_modification_time = result.GetLastModified().Seconds();
if (with_metadata)
object_info.metadata = result.GetMetadata();

View File

@ -116,7 +116,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
}
// if (file_size != 0 && file_offset >= file_size)
// {
// LOG_TEST(log, "KSSENII 1 2");
// return false;
// }

View File

@ -67,11 +67,11 @@ std::optional<ColumnsDescription> ReadBufferIterator::tryGetColumnsFromCache(
auto get_last_mod_time = [&] -> std::optional<time_t>
{
if (object_info->metadata)
return object_info->metadata->last_modified.epochMicroseconds();
return object_info->metadata->last_modified.epochTime();
else
{
object_info->metadata = object_storage->getObjectMetadata(object_info->relative_path);
return object_info->metadata->last_modified.epochMicroseconds();
return object_info->metadata->last_modified.epochTime();
}
};

View File

@ -76,6 +76,11 @@ StorageObjectStorageSource::~StorageObjectStorageSource()
create_reader_pool->wait();
}
void StorageObjectStorageSource::setKeyCondition(const ActionsDAGPtr & filter_actions_dag, ContextPtr context_)
{
setKeyConditionImpl(filter_actions_dag, context_, read_from_format_info.format_header);
}
std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSource::createFileIterator(
ConfigurationPtr configuration,
ObjectStoragePtr object_storage,
@ -213,9 +218,11 @@ std::optional<size_t> StorageObjectStorageSource::tryGetNumRowsFromCache(const O
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
return object_info->metadata
? object_info->metadata->last_modified.epochMicroseconds()
: 0;
if (object_info->metadata)
{
return object_info->metadata->last_modified.epochTime();
}
return std::nullopt;
};
return schema_cache.tryGetNumRows(cache_key, get_last_mod_time);
}
@ -260,7 +267,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
const auto max_parsing_threads = need_only_count ? std::optional<size_t>(1) : std::nullopt;
read_buf = createReadBuffer(object_info->relative_path, object_info->metadata->size_bytes);
LOG_TEST(&Poco::Logger::get("KSSENII"), "KSSENII HEADER: {}", read_from_format_info.format_header.dumpStructure());
auto input_format = FormatFactory::instance().getInput(
configuration->format, *read_buf, read_from_format_info.format_header,
getContext(), max_block_size, format_settings, max_parsing_threads,
@ -354,7 +360,7 @@ ObjectInfoPtr StorageObjectStorageSource::IIterator::next(size_t processor)
if (object_info)
{
LOG_TEST(&Poco::Logger::get("KeysIterator"), "Next key: {}", object_info->relative_path);
LOG_TEST(logger, "Next key: {}", object_info->relative_path);
}
return object_info;

View File

@ -38,10 +38,7 @@ public:
String getName() const override { return name; }
void setKeyCondition(const ActionsDAGPtr & filter_actions_dag, ContextPtr context_) override
{
setKeyConditionImpl(filter_actions_dag, context_, read_from_format_info.format_header);
}
void setKeyCondition(const ActionsDAGPtr & filter_actions_dag, ContextPtr context_) override;
Chunk generate() override;
@ -65,11 +62,11 @@ protected:
const bool need_only_count;
const ReadFromFormatInfo read_from_format_info;
const std::shared_ptr<ThreadPool> create_reader_pool;
ColumnsDescription columns_desc;
std::shared_ptr<IIterator> file_iterator;
SchemaCache & schema_cache;
bool initialized = false;
size_t total_rows_in_file = 0;
LoggerPtr log = getLogger("StorageObjectStorageSource");