Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/duckdb/extension/parquet/include/parquet_crypto.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class ParquetKeys : public ObjectCacheEntry {
public:
static string ObjectType();
string GetObjectType() override;
optional_idx GetEstimatedCacheMemory() const override {
return optional_idx {};
}

private:
unordered_map<string, string> keys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ParquetFileMetadataCache : public ObjectCacheEntry {
public:
static string ObjectType();
string GetObjectType() override;
optional_idx GetEstimatedCacheMemory() const override;

bool IsValid(CachingFileHandle &new_handle) const;
//! Return if a cache entry is valid.
Expand Down
5 changes: 4 additions & 1 deletion src/duckdb/extension/parquet/include/parquet_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "duckdb.hpp"
#include "duckdb/common/helper.hpp"
#include "duckdb/storage/caching_file_system.hpp"
#include "duckdb/common/common.hpp"
#include "duckdb/common/encryption_functions.hpp"
Expand Down Expand Up @@ -206,7 +207,9 @@ class ParquetReader : public BaseFileReader {
void AddVirtualColumn(column_t virtual_column_id) override;

void GetPartitionStats(vector<PartitionStatistics> &result);
static void GetPartitionStats(const duckdb_parquet::FileMetaData &metadata, vector<PartitionStatistics> &result);
static void GetPartitionStats(const duckdb_parquet::FileMetaData &metadata, vector<PartitionStatistics> &result,
optional_ptr<ParquetColumnSchema> root_schema = nullptr,
optional_ptr<ParquetOptions> parquet_options = nullptr);
static bool MetadataCacheEnabled(ClientContext &context);
static shared_ptr<ParquetFileMetadataCache> GetMetadataCacheEntry(ClientContext &context, const OpenFileInfo &file);

Expand Down
5 changes: 1 addition & 4 deletions src/duckdb/extension/parquet/parquet_crypto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ namespace duckdb {

ParquetKeys &ParquetKeys::Get(ClientContext &context) {
auto &cache = ObjectCache::GetObjectCache(context);
if (!cache.Get<ParquetKeys>(ParquetKeys::ObjectType())) {
cache.Put(ParquetKeys::ObjectType(), make_shared_ptr<ParquetKeys>());
}
return *cache.Get<ParquetKeys>(ParquetKeys::ObjectType());
return *cache.GetOrCreate<ParquetKeys>(ParquetKeys::ObjectType());
}

void ParquetKeys::AddKey(const string &key_name, const string &key) {
Expand Down
25 changes: 25 additions & 0 deletions src/duckdb/extension/parquet/parquet_file_metadata_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,31 @@ string ParquetFileMetadataCache::GetObjectType() {
return ObjectType();
}

optional_idx ParquetFileMetadataCache::GetEstimatedCacheMemory() const {
// Base memory consumption
idx_t memory = sizeof(*this);

if (metadata) {
const auto num_cols = metadata->schema.size();
memory += sizeof(duckdb_parquet::FileMetaData);
memory += num_cols * sizeof(duckdb_parquet::SchemaElement);
memory += metadata->row_groups.size() * sizeof(duckdb_parquet::RowGroup) +
num_cols * sizeof(duckdb_parquet::ColumnChunk);
}
if (geo_metadata) {
memory +=
sizeof(GeoParquetFileMetadata) + geo_metadata->GetColumnMeta().size() * sizeof(GeoParquetColumnMetadata);
}
if (crypto_metadata) {
memory += sizeof(FileCryptoMetaData);
}

memory += footer_size;
memory += version_tag.size();

return memory;
}

bool ParquetFileMetadataCache::IsValid(CachingFileHandle &new_handle) const {
return ExternalFileCache::IsValid(validate, version_tag, last_modified, new_handle.GetVersionTag(),
new_handle.GetLastModifiedTime());
Expand Down
9 changes: 5 additions & 4 deletions src/duckdb/extension/parquet/parquet_multi_file_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,17 +525,18 @@ shared_ptr<BaseFileReader> ParquetMultiFileInfo::CreateReader(ClientContext &con

shared_ptr<BaseUnionData> ParquetReader::GetUnionData(idx_t file_idx) {
auto result = make_uniq<ParquetUnionData>(file);
result->names.reserve(columns.size());
result->types.reserve(columns.size());
for (auto &column : columns) {
result->names.push_back(column.name);
result->types.push_back(column.type);
}

result->options = parquet_options;
result->metadata = metadata;
if (file_idx == 0) {
result->options = parquet_options;
result->metadata = metadata;
result->reader = shared_from_this();
} else {
result->options = std::move(parquet_options);
result->metadata = std::move(metadata);
result->root_schema = std::move(root_schema);
}
return std::move(result);
Expand Down
83 changes: 59 additions & 24 deletions src/duckdb/extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,42 +1,30 @@
#include "parquet_reader.hpp"

#include "reader/boolean_column_reader.hpp"
#include "reader/callback_column_reader.hpp"
#include "duckdb/common/optional_ptr.hpp"
#include "duckdb/function/partition_stats.hpp"
#include "parquet_types.h"
#include "column_reader.hpp"
#include "duckdb.hpp"
#include "reader/expression_column_reader.hpp"
#include "parquet_geometry.hpp"
#include "reader/list_column_reader.hpp"
#include "parquet_crypto.hpp"
#include "parquet_file_metadata_cache.hpp"
#include "parquet_statistics.hpp"
#include "parquet_timestamp.hpp"
#include "mbedtls_wrapper.hpp"
#include "reader/row_number_column_reader.hpp"
#include "reader/string_column_reader.hpp"
#include "reader/variant_column_reader.hpp"
#include "reader/struct_column_reader.hpp"
#include "reader/templated_column_reader.hpp"
#include "thrift_tools.hpp"
#include "duckdb/main/config.hpp"
#include "duckdb/common/encryption_state.hpp"
#include "duckdb/common/file_system.hpp"
#include "duckdb/common/helper.hpp"
#include "duckdb/common/hive_partitioning.hpp"
#include "duckdb/common/string_util.hpp"
#include "duckdb/planner/table_filter.hpp"
#include "duckdb/storage/object_cache.hpp"
#include "duckdb/optimizer/statistics_propagator.hpp"
#include "duckdb/planner/table_filter_state.hpp"
#include "duckdb/common/multi_file/multi_file_reader.hpp"
#include "duckdb/logging/log_manager.hpp"
#include "duckdb/common/multi_file/multi_file_column_mapper.hpp"
#include "duckdb/common/encryption_functions.hpp"

#include <cassert>
#include <chrono>
#include <cstring>
#include <sstream>

namespace duckdb {

Expand Down Expand Up @@ -176,7 +164,7 @@ LoadMetadata(ClientContext &context, Allocator &allocator, CachingFileHandle &fi
}
ParquetCrypto::GenerateAdditionalAuthenticatedData(allocator, aad_crypto_metadata);
ParquetCrypto::Read(*metadata, *file_proto, encryption_config->GetFooterKey(), encryption_util,
std::move(aad_crypto_metadata));
aad_crypto_metadata);
} else {
metadata->read(file_proto.get());
}
Expand Down Expand Up @@ -650,8 +638,8 @@ ParquetColumnSchema ParquetReader::ParseSchemaRecursive(idx_t depth, idx_t max_d
if (is_repeated) {
auto list_type = LogicalType::LIST(result.type);
vector<ParquetColumnSchema> list_child = {std::move(result)};
result = ParquetColumnSchema::FromChildSchemas(s_ele.name, std::move(list_type), max_define, max_repeat,
this_idx, next_file_idx, std::move(list_child));
result = ParquetColumnSchema::FromChildSchemas(s_ele.name, list_type, max_define, max_repeat, this_idx,
next_file_idx, std::move(list_child));
}
result.parent_schema_index = this_idx;
return result;
Expand All @@ -665,8 +653,8 @@ ParquetColumnSchema ParquetReader::ParseSchemaRecursive(idx_t depth, idx_t max_d
if (s_ele.repetition_type == FieldRepetitionType::REPEATED) {
auto list_type = LogicalType::LIST(result.type);
vector<ParquetColumnSchema> list_child = {std::move(result)};
return ParquetColumnSchema::FromChildSchemas(s_ele.name, std::move(list_type), max_define, max_repeat,
this_idx, next_file_idx, std::move(list_child));
return ParquetColumnSchema::FromChildSchemas(s_ele.name, list_type, max_define, max_repeat, this_idx,
next_file_idx, std::move(list_child));
}

return result;
Expand Down Expand Up @@ -1233,17 +1221,64 @@ void ParquetReader::InitializeScan(ClientContext &context, ParquetReaderScanStat
}

void ParquetReader::GetPartitionStats(vector<PartitionStatistics> &result) {
GetPartitionStats(*GetFileMetadata(), result);
GetPartitionStats(*GetFileMetadata(), result, *root_schema, parquet_options);
}

void ParquetReader::GetPartitionStats(const duckdb_parquet::FileMetaData &metadata,
vector<PartitionStatistics> &result) {
struct ParquetPartitionRowGroup : public PartitionRowGroup {
ParquetPartitionRowGroup(const duckdb_parquet::FileMetaData &metadata_p,
optional_ptr<ParquetColumnSchema> root_schema_p,
optional_ptr<ParquetOptions> parquet_options_p, const idx_t row_group_idx_p)
: metadata(metadata_p), root_schema(root_schema_p), parquet_options(parquet_options_p),
row_group_idx(row_group_idx_p) {
}

const duckdb_parquet::FileMetaData &metadata;
const optional_ptr<ParquetColumnSchema> root_schema;
const optional_ptr<ParquetOptions> parquet_options;
const idx_t row_group_idx;

unique_ptr<BaseStatistics> GetColumnStatistics(const StorageIndex &storage_index) override {
const idx_t primary_index = storage_index.GetPrimaryIndex();
D_ASSERT(metadata.row_groups.size() > row_group_idx);
D_ASSERT(root_schema->children.size() > primary_index);

const auto &row_group = metadata.row_groups[row_group_idx];
const auto &column_schema = root_schema->children[primary_index];
return column_schema.Stats(metadata, *parquet_options, row_group_idx, row_group.columns);
}

bool MinMaxIsExact(const BaseStatistics &, const StorageIndex &storage_index) override {
const idx_t primary_index = storage_index.GetPrimaryIndex();
D_ASSERT(metadata.row_groups.size() > row_group_idx);
D_ASSERT(root_schema->children.size() > primary_index);

const auto &row_group = metadata.row_groups[row_group_idx];
const auto &column_chunk = row_group.columns[primary_index];

if (column_chunk.__isset.meta_data && column_chunk.meta_data.__isset.statistics &&
column_chunk.meta_data.statistics.__isset.is_min_value_exact &&
column_chunk.meta_data.statistics.__isset.is_max_value_exact) {
const auto &stats = column_chunk.meta_data.statistics;
return stats.is_min_value_exact && stats.is_max_value_exact;
}
return false;
}
};

void ParquetReader::GetPartitionStats(const duckdb_parquet::FileMetaData &metadata, vector<PartitionStatistics> &result,
optional_ptr<ParquetColumnSchema> root_schema,
optional_ptr<ParquetOptions> parquet_options) {
idx_t offset = 0;
for (auto &row_group : metadata.row_groups) {
for (idx_t i = 0; i < metadata.row_groups.size(); i++) {
auto &row_group = metadata.row_groups[i];
PartitionStatistics partition_stats;
partition_stats.row_start = offset;
partition_stats.count = row_group.num_rows;
partition_stats.count_type = CountType::COUNT_EXACT;
if (root_schema && parquet_options) {
partition_stats.partition_row_group =
make_shared_ptr<ParquetPartitionRowGroup>(metadata, root_schema, parquet_options, i);
}
offset += row_group.num_rows;
result.push_back(partition_stats);
}
Expand Down
11 changes: 11 additions & 0 deletions src/duckdb/src/catalog/default/default_table_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ FROM histogram_values(source, col_name, bin_count := bin_count, technique := tec
SELECT * EXCLUDE (message), UNNEST(parse_duckdb_log_message(log_type, message))
FROM duckdb_logs(denormalized_table=1)
WHERE type ILIKE log_type
)"},
{DEFAULT_SCHEMA, "duckdb_profiling_settings", {}, {}, R"(
SELECT * EXCLUDE(input_type, scope, aliases)
FROM duckdb_settings()
WHERE name IN (
'enable_profiling',
'profiling_coverage',
'profiling_output',
'profiling_mode',
'custom_profiling_settings'
);
)"},
{nullptr, nullptr, {nullptr}, {{nullptr, nullptr}}, nullptr}
};
Expand Down
4 changes: 2 additions & 2 deletions src/duckdb/src/common/arrow/schema_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ unsafe_unique_array<char> ArrowSchemaMetadata::SerializeMetadata() const {
memcpy(metadata_ptr, &key_size, sizeof(int32_t));
metadata_ptr += sizeof(int32_t);
// Key
memcpy(metadata_ptr, key.c_str(), key_size);
memcpy(metadata_ptr, key.c_str(), key.size());
metadata_ptr += key_size;
const std::string &value = pair.second;
const int32_t value_size = static_cast<int32_t>(value.size());
// Length of the value (int32)
memcpy(metadata_ptr, &value_size, sizeof(int32_t));
metadata_ptr += sizeof(int32_t);
// Value
memcpy(metadata_ptr, value.c_str(), value_size);
memcpy(metadata_ptr, value.c_str(), value.size());
metadata_ptr += value_size;
}
return metadata_array_ptr;
Expand Down
5 changes: 1 addition & 4 deletions src/duckdb/src/common/encryption_key_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ void EncryptionKey::UnlockEncryptionKey(data_ptr_t key, idx_t key_len) {
}

EncryptionKeyManager &EncryptionKeyManager::GetInternal(ObjectCache &cache) {
if (!cache.Get<EncryptionKeyManager>(EncryptionKeyManager::ObjectType())) {
cache.Put(EncryptionKeyManager::ObjectType(), make_shared_ptr<EncryptionKeyManager>());
}
return *cache.Get<EncryptionKeyManager>(EncryptionKeyManager::ObjectType());
return *cache.GetOrCreate<EncryptionKeyManager>(EncryptionKeyManager::ObjectType());
}

EncryptionKeyManager &EncryptionKeyManager::Get(ClientContext &context) {
Expand Down
4 changes: 2 additions & 2 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3029,7 +3029,7 @@ const StringUtil::EnumStringLiteral *GetMetricTypeValues() {
{ static_cast<uint32_t>(MetricType::OPTIMIZER_CTE_INLINING), "OPTIMIZER_CTE_INLINING" },
{ static_cast<uint32_t>(MetricType::OPTIMIZER_COMMON_SUBPLAN), "OPTIMIZER_COMMON_SUBPLAN" },
{ static_cast<uint32_t>(MetricType::OPTIMIZER_JOIN_ELIMINATION), "OPTIMIZER_JOIN_ELIMINATION" },
{ static_cast<uint32_t>(MetricType::OPTIMIZER_COUNT_WINDOW_ELIMINATION), "OPTIMIZER_COUNT_WINDOW_ELIMINATION" },
{ static_cast<uint32_t>(MetricType::OPTIMIZER_WINDOW_SELF_JOIN), "OPTIMIZER_WINDOW_SELF_JOIN" },
{ static_cast<uint32_t>(MetricType::ALL_OPTIMIZERS), "ALL_OPTIMIZERS" },
{ static_cast<uint32_t>(MetricType::CUMULATIVE_OPTIMIZER_TIMING), "CUMULATIVE_OPTIMIZER_TIMING" },
{ static_cast<uint32_t>(MetricType::PHYSICAL_PLANNER), "PHYSICAL_PLANNER" },
Expand Down Expand Up @@ -3286,7 +3286,7 @@ const StringUtil::EnumStringLiteral *GetOptimizerTypeValues() {
{ static_cast<uint32_t>(OptimizerType::CTE_INLINING), "CTE_INLINING" },
{ static_cast<uint32_t>(OptimizerType::COMMON_SUBPLAN), "COMMON_SUBPLAN" },
{ static_cast<uint32_t>(OptimizerType::JOIN_ELIMINATION), "JOIN_ELIMINATION" },
{ static_cast<uint32_t>(OptimizerType::COUNT_WINDOW_ELIMINATION), "COUNT_WINDOW_ELIMINATION" }
{ static_cast<uint32_t>(OptimizerType::WINDOW_SELF_JOIN), "WINDOW_SELF_JOIN" }
};
return values;
}
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/enums/optimizer_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static const DefaultOptimizerType internal_optimizer_types[] = {
{"cte_inlining", OptimizerType::CTE_INLINING},
{"common_subplan", OptimizerType::COMMON_SUBPLAN},
{"join_elimination", OptimizerType::JOIN_ELIMINATION},
{"count_window_elimination", OptimizerType::COUNT_WINDOW_ELIMINATION},
{"window_self_join", OptimizerType::WINDOW_SELF_JOIN},
{nullptr, OptimizerType::INVALID}};

string OptimizerTypeToString(OptimizerType type) {
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/execution/index/art/prefix_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ PrefixHandle::PrefixHandle(FixedSizeAllocator &allocator, const Node node, const
}

PrefixHandle::PrefixHandle(PrefixHandle &&other) noexcept
: segment_handle(std::move(other.segment_handle)), data(other.data), child(other.child) {
: data(other.data), child(other.child), segment_handle(std::move(other.segment_handle)) {
other.data = nullptr;
other.child = nullptr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ CSVStateMachineCache::CSVStateMachineCache() {

const StateMachine &CSVStateMachineCache::Get(const CSVStateMachineOptions &state_machine_options) {
// Custom State Machine, we need to create it and cache it first
lock_guard<mutex> parallel_lock(main_mutex);
const lock_guard<mutex> parallel_lock(main_mutex);
if (state_machine_cache.find(state_machine_options) == state_machine_cache.end()) {
Insert(state_machine_options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void CSVGlobalState::FillRejectsTable(CSVFileScan &scan) {
auto limit = options.rejects_limit;
auto rejects = CSVRejectsTable::GetOrCreate(context, options.rejects_scan_name.GetValue(),
options.rejects_table_name.GetValue());
lock_guard<mutex> lock(rejects->write_lock);
const lock_guard<mutex> lock(rejects->write_lock);
auto &errors_table = rejects->GetErrorsTable(context);
auto &scans_table = rejects->GetScansTable(context);
InternalAppender errors_appender(context, errors_table);
Expand Down
Loading