Skip to content

Commit 3743008

Browse files
HeartLinkedwgtmac
andauthored
feat(rest): implement load table and drop table (#438)
Co-authored-by: Gang Wu <[email protected]>
1 parent 7e784dc commit 3743008

File tree

8 files changed

+199
-109
lines changed

8 files changed

+199
-109
lines changed

src/iceberg/catalog/rest/http_client.cc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,12 @@ Status HandleFailureResponse(const cpr::Response& response,
135135
} // namespace
136136

137137
void HttpClient::PrepareSession(
138-
const std::string& path,
139-
const std::unordered_map<std::string, std::string>& request_headers,
140-
const std::unordered_map<std::string, std::string>& params) {
138+
const std::string& path, const std::unordered_map<std::string, std::string>& params,
139+
const std::unordered_map<std::string, std::string>& headers) {
141140
session_->SetUrl(cpr::Url{path});
142141
session_->SetParameters(GetParameters(params));
143142
session_->RemoveContent();
144-
auto final_headers = MergeHeaders(default_headers_, request_headers);
143+
auto final_headers = MergeHeaders(default_headers_, headers);
145144
session_->SetHeader(final_headers);
146145
}
147146

@@ -164,7 +163,7 @@ Result<HttpResponse> HttpClient::Get(
164163
cpr::Response response;
165164
{
166165
std::lock_guard guard(session_mutex_);
167-
PrepareSession(path, headers, params);
166+
PrepareSession(path, params, headers);
168167
response = session_->Get();
169168
}
170169

@@ -181,7 +180,7 @@ Result<HttpResponse> HttpClient::Post(
181180
cpr::Response response;
182181
{
183182
std::lock_guard guard(session_mutex_);
184-
PrepareSession(path, headers);
183+
PrepareSession(path, /*params=*/{}, headers);
185184
session_->SetBody(cpr::Body{body});
186185
response = session_->Post();
187186
}
@@ -206,7 +205,7 @@ Result<HttpResponse> HttpClient::PostForm(
206205
auto form_headers = headers;
207206
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;
208207

209-
PrepareSession(path, form_headers);
208+
PrepareSession(path, /*params=*/{}, form_headers);
210209
std::vector<cpr::Pair> pair_list;
211210
pair_list.reserve(form_data.size());
212211
for (const auto& [key, val] : form_data) {
@@ -229,7 +228,7 @@ Result<HttpResponse> HttpClient::Head(
229228
cpr::Response response;
230229
{
231230
std::lock_guard guard(session_mutex_);
232-
PrepareSession(path, headers);
231+
PrepareSession(path, /*params=*/{}, headers);
233232
response = session_->Head();
234233
}
235234

@@ -240,12 +239,13 @@ Result<HttpResponse> HttpClient::Head(
240239
}
241240

242241
Result<HttpResponse> HttpClient::Delete(
243-
const std::string& path, const std::unordered_map<std::string, std::string>& headers,
242+
const std::string& path, const std::unordered_map<std::string, std::string>& params,
243+
const std::unordered_map<std::string, std::string>& headers,
244244
const ErrorHandler& error_handler) {
245245
cpr::Response response;
246246
{
247247
std::lock_guard guard(session_mutex_);
248-
PrepareSession(path, headers);
248+
PrepareSession(path, params, headers);
249249
response = session_->Delete();
250250
}
251251

src/iceberg/catalog/rest/http_client.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,14 @@ class ICEBERG_REST_EXPORT HttpClient {
104104

105105
/// \brief Sends a DELETE request.
106106
Result<HttpResponse> Delete(const std::string& path,
107+
const std::unordered_map<std::string, std::string>& params,
107108
const std::unordered_map<std::string, std::string>& headers,
108109
const ErrorHandler& error_handler);
109110

110111
private:
111112
void PrepareSession(const std::string& path,
112-
const std::unordered_map<std::string, std::string>& request_headers,
113-
const std::unordered_map<std::string, std::string>& params = {});
113+
const std::unordered_map<std::string, std::string>& params,
114+
const std::unordered_map<std::string, std::string>& headers);
114115

115116
std::unordered_map<std::string, std::string> default_headers_;
116117

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 82 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,33 @@ Result<CatalogConfig> FetchServerConfig(const ResourcePaths& paths,
7474
return CatalogConfigFromJson(json);
7575
}
7676

77+
#define ICEBERG_ENDPOINT_CHECK(endpoints, endpoint) \
78+
do { \
79+
if (!endpoints.contains(endpoint)) { \
80+
return NotSupported("Not supported endpoint: {}", endpoint.ToString()); \
81+
} \
82+
} while (0)
83+
84+
Result<bool> CaptureNoSuchObject(const auto& status, ErrorKind kind) {
85+
ICEBERG_DCHECK(kind == ErrorKind::kNoSuchTable || kind == ErrorKind::kNoSuchNamespace,
86+
"Invalid kind for CaptureNoSuchObject");
87+
if (status.has_value()) {
88+
return true;
89+
}
90+
if (status.error().kind == kind) {
91+
return false;
92+
}
93+
return std::unexpected(status.error());
94+
}
95+
96+
Result<bool> CaptureNoSuchTable(const auto& status) {
97+
return CaptureNoSuchObject(status, ErrorKind::kNoSuchTable);
98+
}
99+
100+
Result<bool> CaptureNoSuchNamespace(const auto& status) {
101+
return CaptureNoSuchObject(status, ErrorKind::kNoSuchNamespace);
102+
}
103+
77104
} // namespace
78105

79106
RestCatalog::~RestCatalog() = default;
@@ -126,9 +153,7 @@ RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
126153
std::string_view RestCatalog::name() const { return name_; }
127154

128155
Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns) const {
129-
ICEBERG_RETURN_UNEXPECTED(
130-
CheckEndpoint(supported_endpoints_, Endpoint::ListNamespaces()));
131-
156+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListNamespaces());
132157
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
133158
std::vector<Namespace> result;
134159
std::string next_token;
@@ -157,9 +182,7 @@ Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns)
157182

158183
Status RestCatalog::CreateNamespace(
159184
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
160-
ICEBERG_RETURN_UNEXPECTED(
161-
CheckEndpoint(supported_endpoints_, Endpoint::CreateNamespace()));
162-
185+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateNamespace());
163186
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
164187
CreateNamespaceRequest request{.namespace_ = ns, .properties = properties};
165188
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
@@ -173,9 +196,7 @@ Status RestCatalog::CreateNamespace(
173196

174197
Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespaceProperties(
175198
const Namespace& ns) const {
176-
ICEBERG_RETURN_UNEXPECTED(
177-
CheckEndpoint(supported_endpoints_, Endpoint::GetNamespaceProperties()));
178-
199+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::GetNamespaceProperties());
179200
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
180201
ICEBERG_ASSIGN_OR_RAISE(const auto response,
181202
client_->Get(path, /*params=*/{}, /*headers=*/{},
@@ -186,48 +207,29 @@ Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespacePr
186207
}
187208

188209
Status RestCatalog::DropNamespace(const Namespace& ns) {
189-
ICEBERG_RETURN_UNEXPECTED(
190-
CheckEndpoint(supported_endpoints_, Endpoint::DropNamespace()));
210+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DropNamespace());
191211
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
192-
ICEBERG_ASSIGN_OR_RAISE(
193-
const auto response,
194-
client_->Delete(path, /*headers=*/{}, *DropNamespaceErrorHandler::Instance()));
212+
ICEBERG_ASSIGN_OR_RAISE(const auto response,
213+
client_->Delete(path, /*params=*/{}, /*headers=*/{},
214+
*DropNamespaceErrorHandler::Instance()));
195215
return {};
196216
}
197217

198218
Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
199-
auto check = CheckEndpoint(supported_endpoints_, Endpoint::NamespaceExists());
200-
if (!check.has_value()) {
219+
if (!supported_endpoints_.contains(Endpoint::NamespaceExists())) {
201220
// Fall back to GetNamespaceProperties
202-
auto result = GetNamespaceProperties(ns);
203-
if (!result.has_value() && result.error().kind == ErrorKind::kNoSuchNamespace) {
204-
return false;
205-
}
206-
ICEBERG_RETURN_UNEXPECTED(result);
207-
// GET succeeded, namespace exists
208-
return true;
221+
return CaptureNoSuchNamespace(GetNamespaceProperties(ns));
209222
}
210223

211224
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
212-
auto response_or_error =
213-
client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance());
214-
if (!response_or_error.has_value()) {
215-
const auto& error = response_or_error.error();
216-
// catch NoSuchNamespaceException/404 and return false
217-
if (error.kind == ErrorKind::kNoSuchNamespace) {
218-
return false;
219-
}
220-
ICEBERG_RETURN_UNEXPECTED(response_or_error);
221-
}
222-
return true;
225+
return CaptureNoSuchNamespace(
226+
client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance()));
223227
}
224228

225229
Status RestCatalog::UpdateNamespaceProperties(
226230
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
227231
const std::unordered_set<std::string>& removals) {
228-
ICEBERG_RETURN_UNEXPECTED(
229-
CheckEndpoint(supported_endpoints_, Endpoint::UpdateNamespace()));
230-
232+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateNamespace());
231233
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->NamespaceProperties(ns));
232234
UpdateNamespacePropertiesRequest request{
233235
.removals = std::vector<std::string>(removals.begin(), removals.end()),
@@ -252,7 +254,7 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
252254
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
253255
const std::string& location,
254256
const std::unordered_map<std::string, std::string>& properties) {
255-
ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::CreateTable()));
257+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
256258
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
257259

258260
CreateTableRequest request{
@@ -294,24 +296,57 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
294296
return NotImplemented("Not implemented");
295297
}
296298

297-
Status RestCatalog::DropTable([[maybe_unused]] const TableIdentifier& identifier,
298-
[[maybe_unused]] bool purge) {
299-
return NotImplemented("Not implemented");
299+
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
300+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DeleteTable());
301+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
302+
303+
std::unordered_map<std::string, std::string> params;
304+
if (purge) {
305+
params["purgeRequested"] = "true";
306+
}
307+
ICEBERG_ASSIGN_OR_RAISE(
308+
const auto response,
309+
client_->Delete(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
310+
return {};
300311
}
301312

302-
Result<bool> RestCatalog::TableExists(
303-
[[maybe_unused]] const TableIdentifier& identifier) const {
304-
return NotImplemented("Not implemented");
313+
Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
314+
if (!supported_endpoints_.contains(Endpoint::TableExists())) {
315+
// Fall back to call LoadTable
316+
return CaptureNoSuchTable(LoadTableInternal(identifier));
317+
}
318+
319+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
320+
return CaptureNoSuchTable(
321+
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance()));
305322
}
306323

307324
Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
308325
[[maybe_unused]] const TableIdentifier& to) {
309326
return NotImplemented("Not implemented");
310327
}
311328

312-
Result<std::shared_ptr<Table>> RestCatalog::LoadTable(
313-
[[maybe_unused]] const TableIdentifier& identifier) {
314-
return NotImplemented("Not implemented");
329+
Result<std::string> RestCatalog::LoadTableInternal(
330+
const TableIdentifier& identifier) const {
331+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
332+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
333+
ICEBERG_ASSIGN_OR_RAISE(
334+
const auto response,
335+
client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance()));
336+
return response.body();
337+
}
338+
339+
Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& identifier) {
340+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
341+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
342+
343+
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
344+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
345+
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
346+
347+
return Table::Make(identifier, std::move(load_result.metadata),
348+
std::move(load_result.metadata_location), file_io_,
349+
shared_from_this());
315350
}
316351

317352
Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
108108
std::shared_ptr<FileIO> file_io, std::unique_ptr<ResourcePaths> paths,
109109
std::unordered_set<Endpoint> endpoints);
110110

111+
Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;
112+
111113
std::unique_ptr<RestCatalogProperties> config_;
112114
std::shared_ptr<FileIO> file_io_;
113115
std::unique_ptr<HttpClient> client_;

src/iceberg/catalog/rest/rest_util.cc

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -253,12 +253,4 @@ std::string GetStandardReasonPhrase(int32_t status_code) {
253253
}
254254
}
255255

256-
Status CheckEndpoint(const std::unordered_set<Endpoint>& supported_endpoints,
257-
const Endpoint& endpoint) {
258-
if (!supported_endpoints.contains(endpoint)) {
259-
return NotSupported("Server does not support endpoint: {}", endpoint.ToString());
260-
}
261-
return {};
262-
}
263-
264256
} // namespace iceberg::rest

src/iceberg/catalog/rest/rest_util.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,4 @@ ICEBERG_REST_EXPORT std::unordered_map<std::string, std::string> MergeConfigs(
9393
/// Error").
9494
ICEBERG_REST_EXPORT std::string GetStandardReasonPhrase(int32_t status_code);
9595

96-
/// \brief Check whether the given endpoint is in the set of supported endpoints.
97-
///
98-
/// \param supported_endpoints Set of endpoints advertised by the server
99-
/// \param endpoint Endpoint to validate
100-
/// \return Status::OK if supported, NotSupported error otherwise
101-
ICEBERG_REST_EXPORT Status CheckEndpoint(
102-
const std::unordered_set<Endpoint>& supported_endpoints, const Endpoint& endpoint);
103-
10496
} // namespace iceberg::rest

0 commit comments

Comments
 (0)