11package dev .zarr .zarrjava .store ;
22
3- import com . squareup . okhttp .*;
3+ import okhttp3 .*;
44
55import javax .annotation .Nonnull ;
66import javax .annotation .Nullable ;
77import java .io .FilterInputStream ;
88import java .io .IOException ;
99import java .io .InputStream ;
1010import java .nio .ByteBuffer ;
11+ import java .time .Duration ;
1112
1213public class HttpStore implements Store {
1314
@@ -17,8 +18,16 @@ public class HttpStore implements Store {
1718 private final String uri ;
1819
1920 public HttpStore (@ Nonnull String uri ) {
20- this .httpClient = new OkHttpClient ();
21+ this (uri , 60 , 3 , 1000 );
22+ }
23+
24+ public HttpStore (@ Nonnull String uri , int timeoutSeconds , int maxRetries , long retryDelayMs ) {
2125 this .uri = uri ;
26+ this .httpClient = new OkHttpClient .Builder ()
27+ .connectTimeout (Duration .ofSeconds (timeoutSeconds ))
28+ .readTimeout (Duration .ofSeconds (timeoutSeconds ))
29+ .addInterceptor (new RetryInterceptor (maxRetries , retryDelayMs ))
30+ .build ();
2231 }
2332
2433 String resolveKeys (String [] keys ) {
@@ -37,9 +46,7 @@ String resolveKeys(String[] keys) {
3746
3847 @ Nullable
3948 ByteBuffer get (Request request , String [] keys ) {
40- Call call = httpClient .newCall (request );
41- try {
42- Response response = call .execute ();
49+ try (Response response = httpClient .newCall (request ).execute ()) {
4350 if (!response .isSuccessful ()) {
4451 if (response .code () == 404 ) {
4552 return null ;
@@ -49,12 +56,8 @@ ByteBuffer get(Request request, String[] keys) {
4956 keys ,
5057 new IOException ("HTTP request failed with status code: " + response .code () + " " + response .message ()));
5158 }
52- try (ResponseBody body = response .body ()) {
53- if (body == null ) {
54- return null ;
55- }
56- return ByteBuffer .wrap (body .bytes ());
57- }
59+ ResponseBody body = response .body ();
60+ return (body == null ) ? null : ByteBuffer .wrap (body .bytes ());
5861 } catch (IOException e ) {
5962 throw StoreException .readFailed (this .toString (), keys , e );
6063 }
@@ -63,9 +66,7 @@ ByteBuffer get(Request request, String[] keys) {
6366 @ Override
6467 public boolean exists (String [] keys ) {
6568 Request request = new Request .Builder ().head ().url (resolveKeys (keys )).build ();
66- Call call = httpClient .newCall (request );
67- try {
68- Response response = call .execute ();
69+ try (Response response = httpClient .newCall (request ).execute ()) {
6970 return response .isSuccessful ();
7071 } catch (IOException e ) {
7172 return false ;
@@ -129,28 +130,33 @@ public InputStream getInputStream(String[] keys, long start, long end) {
129130 }
130131 Request request = new Request .Builder ().url (resolveKeys (keys )).header (
131132 "Range" , String .format ("bytes=%d-%d" , start , end - 1 )).build ();
132- Call call = httpClient . newCall ( request );
133+
133134 try {
134- Response response = call .execute ();
135+ // We do NOT use try-with-resources here because the stream must remain open
136+ Response response = httpClient .newCall (request ).execute ();
135137 if (!response .isSuccessful ()) {
136138 if (response .code () == 404 ) {
139+ response .close ();
137140 return null ;
138141 }
139- throw StoreException .readFailed (
140- this .toString (),
141- keys ,
142- new IOException ("HTTP request failed with status code: " + response .code () + " " + response .message ()));
142+ int code = response .code ();
143+ String msg = response .message ();
144+ response .close ();
145+ throw StoreException .readFailed (this .toString (), keys ,
146+ new IOException ("HTTP request failed with status code: " + code + " " + msg ));
143147 }
148+
144149 ResponseBody body = response .body ();
145- if (body == null ) return null ;
146- InputStream stream = body .byteStream ();
150+ if (body == null ) {
151+ response .close ();
152+ return null ;
153+ }
147154
148- // Ensure closing the stream also closes the response
149- return new FilterInputStream (stream ) {
155+ return new FilterInputStream (body .byteStream ()) {
150156 @ Override
151157 public void close () throws IOException {
152158 super .close ();
153- body .close ();
159+ response .close (); // Closes both body and underlying connection
154160 }
155161 };
156162 } catch (IOException e ) {
@@ -169,9 +175,7 @@ public long getSize(String[] keys) {
169175 .header ("Accept-Encoding" , "identity" )
170176 .build ();
171177
172- Call call = httpClient .newCall (request );
173- try {
174- Response response = call .execute ();
178+ try (Response response = httpClient .newCall (request ).execute ()) {
175179 if (!response .isSuccessful ()) {
176180 return -1 ;
177181 }
@@ -193,4 +197,44 @@ public long getSize(String[] keys) {
193197 new IOException ("Failed to get content length from HTTP HEAD request to: " + url , e ));
194198 }
195199 }
196- }
200+
201+ /**
202+ * Internal interceptor to handle retries for all HttpStore requests.
203+ */
204+ private static class RetryInterceptor implements Interceptor {
205+ private final int maxRetries ;
206+ private final long delay ;
207+
208+ RetryInterceptor (int maxRetries , long delay ) {
209+ this .maxRetries = maxRetries ;
210+ this .delay = delay ;
211+ }
212+
213+ @ Override
214+ @ Nonnull
215+ public Response intercept (@ Nonnull Chain chain ) throws IOException {
216+ Request request = chain .request ();
217+ IOException lastException = null ;
218+
219+ for (int i = 0 ; i <= maxRetries ; i ++) {
220+ try {
221+ if (i > 0 ) Thread .sleep (delay );
222+ Response response = chain .proceed (request );
223+
224+ // Retry on common transient server errors (502, 503, 504)
225+ if (response .isSuccessful () || response .code () == 404 || i == maxRetries || response .code () < 500 ) {
226+ return response ;
227+ }
228+ response .close ();
229+ } catch (IOException e ) {
230+ lastException = e ;
231+ if (i == maxRetries ) throw e ;
232+ } catch (InterruptedException e ) {
233+ Thread .currentThread ().interrupt ();
234+ throw new IOException ("Retry interrupted" , e );
235+ }
236+ }
237+ throw lastException != null ? lastException : new IOException ("Request failed after retries" );
238+ }
239+ }
240+ }
0 commit comments