Fixed SQLiteSource

This commit is contained in:
Maksim Kita 2021-08-13 00:58:24 +03:00
parent 124a87684f
commit d9a59370d3
6 changed files with 39 additions and 36 deletions

View File

@ -23,32 +23,32 @@ namespace ErrorCodes
}
SQLiteSource::SQLiteSource(
SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)
SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)
: SourceWithProgress(sample_block.cloneEmpty())
, query_str(query_str_)
, max_block_size(max_block_size_)
, sqlite_db(std::move(sqlite_db_))
{
description.init(sample_block);
sqlite3_stmt * compiled_stmt = nullptr;
int status = sqlite3_prepare_v2(sqlite_db.get(), query_str.c_str(), query_str.size() + 1, &compiled_stmt, nullptr);
if (status != SQLITE_OK)
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot prepate sqlite statement. Status: {}. Message: {}",
status, sqlite3_errstr(status));
compiled_statement = std::unique_ptr<sqlite3_stmt, StatementDeleter>(compiled_stmt, StatementDeleter());
}
Chunk SQLiteSource::generate()
{
if (!compiled_statement)
{
sqlite3_stmt * compiled_stmt = nullptr;
int status = sqlite3_prepare_v2(sqlite_db.get(), query_str.c_str(), query_str.size() + 1, &compiled_stmt, nullptr);
if (status != SQLITE_OK)
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot prepate sqlite statement. Status: {}. Message: {}",
status, sqlite3_errstr(status));
compiled_statement = std::unique_ptr<sqlite3_stmt, StatementDeleter>(compiled_stmt, StatementDeleter());
}
return {};
MutableColumns columns = description.sample_block.cloneEmptyColumns();
size_t num_rows = 0;
@ -69,30 +69,30 @@ Chunk SQLiteSource::generate()
else if (status != SQLITE_ROW)
{
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Expected SQLITE_ROW status, but got status {}. Error: {}, Message: {}",
status, sqlite3_errstr(status), sqlite3_errmsg(sqlite_db.get()));
"Expected SQLITE_ROW status, but got status {}. Error: {}, Message: {}",
status, sqlite3_errstr(status), sqlite3_errmsg(sqlite_db.get()));
}
int column_count = sqlite3_column_count(compiled_statement.get());
for (const auto idx : collections::range(0, column_count))
{
const auto & sample = description.sample_block.getByPosition(idx);
if (sqlite3_column_type(compiled_statement.get(), idx) == SQLITE_NULL)
for (int column_index = 0; column_index < column_count; ++column_index)
{
if (sqlite3_column_type(compiled_statement.get(), column_index) == SQLITE_NULL)
{
insertDefaultSQLiteValue(*columns[idx], *sample.column);
columns[column_index]->insertDefault();
continue;
}
if (description.types[idx].second)
auto & [type, is_nullable] = description.types[column_index];
if (is_nullable)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, idx);
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[column_index]);
insertValue(column_nullable.getNestedColumn(), type, column_index);
column_nullable.getNullMapData().emplace_back(0);
}
else
{
insertValue(*columns[idx], description.types[idx].first, idx);
insertValue(*columns[column_index], type, column_index);
}
}
@ -100,6 +100,12 @@ Chunk SQLiteSource::generate()
break;
}
if (num_rows == 0)
{
compiled_statement.reset();
return {};
}
return Chunk(std::move(columns), num_rows);
}

View File

@ -13,8 +13,10 @@
namespace DB
{
class SQLiteSource : public SourceWithProgress
{
using SQLitePtr = std::shared_ptr<sqlite3>;
public:
@ -26,10 +28,6 @@ public:
String getName() const override { return "SQLite"; }
private:
static void insertDefaultSQLiteValue(IColumn & column, const IColumn & sample_column)
{
column.insertFrom(sample_column, 0);
}
using ValueType = ExternalResultDescription::ValueType;
@ -46,7 +44,6 @@ private:
UInt64 max_block_size;
ExternalResultDescription description;
SQLitePtr sqlite_db;
std::unique_ptr<sqlite3_stmt, StatementDeleter> compiled_statement;
};

View File

@ -31,7 +31,7 @@ void registerDictionarySourceRedis(DictionarySourceFactory & factory)
#include <IO/WriteHelpers.h>
#include "RedisBlockInputStream.h"
#include "RedisSource.h"
namespace DB

View File

@ -1,4 +1,4 @@
#include "RedisBlockInputStream.h"
#include "RedisSource.h"
#include <string>
#include <vector>

View File

@ -26,9 +26,9 @@ SRCS(
CassandraHelpers.cpp
CassandraSource.cpp
ClickHouseDictionarySource.cpp
DictionaryBlockInputStream.cpp
DictionaryBlockInputStreamBase.cpp
DictionaryFactory.cpp
DictionarySource.cpp
DictionarySourceBase.cpp
DictionarySourceFactory.cpp
DictionarySourceHelpers.cpp
DictionaryStructure.cpp
@ -57,8 +57,8 @@ SRCS(
PolygonDictionaryImplementations.cpp
PolygonDictionaryUtils.cpp
RangeHashedDictionary.cpp
RedisBlockInputStream.cpp
RedisDictionarySource.cpp
RedisSource.cpp
XDBCDictionarySource.cpp
getDictionaryConfigurationFromAST.cpp
readInvalidateQuery.cpp