1717
1818package org .apache .hadoop .hdds .utils .db ;
1919
20- import static org .apache .hadoop . hdds . StringUtils . bytes2String ;
20+ import static org .apache .ratis . util . Preconditions . assertTrue ;
2121
2222import com .google .common .base .Preconditions ;
2323import java .io .Closeable ;
3030import java .util .function .Supplier ;
3131import org .apache .hadoop .hdds .utils .IOUtils ;
3232import org .apache .hadoop .hdds .utils .db .RocksDatabase .ColumnFamily ;
33+ import org .apache .hadoop .hdds .utils .db .managed .ManagedDirectSlice ;
34+ import org .apache .hadoop .hdds .utils .db .managed .ManagedSlice ;
3335import org .apache .hadoop .hdds .utils .db .managed .ManagedWriteBatch ;
3436import org .apache .hadoop .hdds .utils .db .managed .ManagedWriteOptions ;
3537import org .apache .ratis .util .TraditionalBinaryPrefix ;
3638import org .apache .ratis .util .UncheckedAutoCloseable ;
39+ import org .rocksdb .AbstractSlice ;
3740import org .slf4j .Logger ;
3841import org .slf4j .LoggerFactory ;
3942
@@ -80,26 +83,26 @@ private static String countSize2String(int count, long size) {
8083 * To implement {@link #equals(Object)} and {@link #hashCode()}
8184 * based on the contents of the bytes.
8285 */
83- static final class Bytes {
84- private final byte [] array ;
85- private final CodecBuffer buffer ;
86+ static final class Bytes implements Closeable {
87+ private final AbstractSlice <?> slice ;
8688 /** Cache the hash value. */
8789 private final int hash ;
8890
89- Bytes (CodecBuffer buffer ) {
90- this .array = null ;
91- this .buffer = Objects .requireNonNull (buffer , "buffer == null" );
92- this .hash = buffer .asReadOnlyByteBuffer ().hashCode ();
91+ static Bytes newBytes (CodecBuffer buffer ) {
92+ return buffer .isDirect () ? new Bytes (buffer .asReadOnlyByteBuffer ()) : new Bytes (buffer .getArray ());
9393 }
9494
95- Bytes (byte [] array ) {
96- this .array = array ;
97- this .buffer = null ;
98- this .hash = ByteBuffer .wrap (array ).hashCode ();
95+ Bytes (ByteBuffer buffer ) {
96+ Objects .requireNonNull (buffer , "buffer == null" );
97+ assertTrue (buffer .isDirect (), "buffer must be direct" );
98+ this .slice = new ManagedDirectSlice (buffer );
99+ this .hash = buffer .hashCode ();
99100 }
100101
101- ByteBuffer asReadOnlyByteBuffer () {
102- return buffer .asReadOnlyByteBuffer ();
102+ Bytes (byte [] array ) {
103+ Objects .requireNonNull (array , "array == null" );
104+ this .slice = new ManagedSlice (array );
105+ this .hash = ByteBuffer .wrap (array ).hashCode ();
103106 }
104107
105108 @ Override
@@ -113,11 +116,7 @@ public boolean equals(Object obj) {
113116 if (this .hash != that .hash ) {
114117 return false ;
115118 }
116- final ByteBuffer thisBuf = this .array != null ?
117- ByteBuffer .wrap (this .array ) : this .asReadOnlyByteBuffer ();
118- final ByteBuffer thatBuf = that .array != null ?
119- ByteBuffer .wrap (that .array ) : that .asReadOnlyByteBuffer ();
120- return thisBuf .equals (thatBuf );
119+ return slice .equals (that .slice );
121120 }
122121
123122 @ Override
@@ -127,12 +126,21 @@ public int hashCode() {
127126
128127 @ Override
129128 public String toString () {
130- return array != null ? bytes2String (array )
131- : bytes2String (asReadOnlyByteBuffer ());
129+ return slice .toString ();
130+ }
131+
132+ @ Override
133+ public void close () {
134+ slice .close ();
132135 }
133136 }
134137
135138 private abstract static class Op implements Closeable {
139+ private final Bytes keyBytes ;
140+
141+ private Op (Bytes keyBytes ) {
142+ this .keyBytes = keyBytes ;
143+ }
136144
137145 abstract void apply (ColumnFamily family , ManagedWriteBatch batch ) throws RocksDatabaseException ;
138146
@@ -148,6 +156,9 @@ int totalLength() {
148156
149157 @ Override
150158 public void close () {
159+ if (keyBytes != null ) {
160+ keyBytes .close ();
161+ }
151162 }
152163 }
153164
@@ -157,7 +168,8 @@ public void close() {
157168 private static final class DeleteOp extends Op {
158169 private final byte [] key ;
159170
160- private DeleteOp (byte [] key ) {
171+ private DeleteOp (byte [] key , Bytes keyBytes ) {
172+ super (Objects .requireNonNull (keyBytes , "keyBytes == null" ));
161173 this .key = Objects .requireNonNull (key , "key == null" );
162174 }
163175
@@ -180,7 +192,8 @@ private final class PutOp extends Op {
180192 private final CodecBuffer value ;
181193 private final AtomicBoolean closed = new AtomicBoolean (false );
182194
183- private PutOp (CodecBuffer key , CodecBuffer value ) {
195+ private PutOp (CodecBuffer key , CodecBuffer value , Bytes keyBytes ) {
196+ super (keyBytes );
184197 this .key = key ;
185198 this .value = value ;
186199 }
@@ -217,7 +230,8 @@ private static final class ByteArrayPutOp extends Op {
217230 private final byte [] key ;
218231 private final byte [] value ;
219232
220- private ByteArrayPutOp (byte [] key , byte [] value ) {
233+ private ByteArrayPutOp (byte [] key , byte [] value , Bytes keyBytes ) {
234+ super (keyBytes );
221235 this .key = Objects .requireNonNull (key , "key == null" );
222236 this .value = Objects .requireNonNull (value , "value == null" );
223237 }
@@ -323,20 +337,20 @@ void overwriteIfExists(Bytes key, Op op) {
323337 void put (CodecBuffer key , CodecBuffer value ) {
324338 putCount ++;
325339 // always release the key with the value
326- Bytes keyBytes = new Bytes (key );
327- overwriteIfExists (keyBytes , new PutOp (key , value ));
340+ Bytes keyBytes = Bytes . newBytes (key );
341+ overwriteIfExists (keyBytes , new PutOp (key , value , keyBytes ));
328342 }
329343
330344 void put (byte [] key , byte [] value ) {
331345 putCount ++;
332346 Bytes keyBytes = new Bytes (key );
333- overwriteIfExists (keyBytes , new ByteArrayPutOp (key , value ));
347+ overwriteIfExists (keyBytes , new ByteArrayPutOp (key , value , keyBytes ));
334348 }
335349
336350 void delete (byte [] key ) {
337351 delCount ++;
338352 Bytes keyBytes = new Bytes (key );
339- overwriteIfExists (keyBytes , new DeleteOp (key ));
353+ overwriteIfExists (keyBytes , new DeleteOp (key , keyBytes ));
340354 }
341355
342356 String putString (int keySize , int valueSize ) {
0 commit comments