2020package org .apache .ranger .audit .destination ;
2121
2222import java .io .File ;
23+ import java .io .IOException ;
2324import java .security .PrivilegedActionException ;
2425import java .util .ArrayList ;
2526import java .util .Arrays ;
@@ -98,7 +99,8 @@ public void init(Properties props, String propPrefix) {
9899 this .port = MiscUtil .getIntProperty (props , propPrefix + "." + CONFIG_PORT , 9200 );
99100 this .index = getStringProperty (props , propPrefix + "." + CONFIG_INDEX , DEFAULT_INDEX );
100101 this .hosts = getHosts ();
101- LOG .info ("Connecting to ElasticSearch: " + connectionString ());
102+
103+ LOG .info ("Connecting to ElasticSearch: {}" , connectionString ());
102104 getClient (); // Initialize client
103105 }
104106
@@ -152,9 +154,7 @@ public boolean log(Collection<AuditEventBase> events) {
152154 addFailedCount (1 );
153155 logFailedEvent (Arrays .asList (itemRequest ), itemResponse .getFailureMessage ());
154156 } else {
155- if (LOG .isDebugEnabled ()) {
156- LOG .debug (String .format ("Indexed %s" , itemRequest .getEventKey ()));
157- }
157+ LOG .debug ("Indexed {}" , itemRequest .getEventKey ());
158158 addSuccessCount (1 );
159159 ret = true ;
160160 }
@@ -219,10 +219,7 @@ public static RestClientBuilder getRestClientBuilder(String urls, String protoco
219219 .map (x -> new HttpHost (x , port , protocol ))
220220 .toArray (HttpHost []::new )
221221 );
222- ThreadFactory clientThreadFactory = new ThreadFactoryBuilder ()
223- .setNameFormat ("ElasticSearch rest client %s" )
224- .setDaemon (true )
225- .build ();
222+ ThreadFactory clientThreadFactory = new ThreadFactoryBuilder ().setNameFormat ("ElasticSearch rest client %s" ).setDaemon (true ).build ();
226223 if (StringUtils .isNotBlank (user ) && StringUtils .isNotBlank (password ) && !user .equalsIgnoreCase ("NONE" ) && !password .equalsIgnoreCase ("NONE" )) {
227224 if (password .contains ("keytab" ) && new File (password ).exists ()) {
228225 final KerberosCredentialsProvider credentialsProvider =
@@ -236,8 +233,7 @@ public static RestClientBuilder getRestClientBuilder(String urls, String protoco
236233 return clientBuilder ;
237234 });
238235 } else {
239- final CredentialsProvider credentialsProvider =
240- CredentialsProviderUtil .getBasicCredentials (user , password );
236+ final CredentialsProvider credentialsProvider = CredentialsProviderUtil .getBasicCredentials (user , password );
241237 restClientBuilder .setHttpClientConfigCallback (clientBuilder -> {
242238 clientBuilder .setThreadFactory (clientThreadFactory );
243239 clientBuilder .setDefaultCredentialsProvider (credentialsProvider );
@@ -257,42 +253,50 @@ public static RestClientBuilder getRestClientBuilder(String urls, String protoco
257253 }
258254
259255 private RestHighLevelClient newClient () {
256+ RestHighLevelClient restHighLevelClient = null ;
257+
260258 try {
261259 if (StringUtils .isNotBlank (user ) && StringUtils .isNotBlank (password ) && password .contains ("keytab" ) && new File (password ).exists ()) {
262260 subject = CredentialsProviderUtil .login (user , password );
263261 }
264- RestClientBuilder restClientBuilder =
265- getRestClientBuilder (hosts , protocol , user , password , port );
266- try (RestHighLevelClient restHighLevelClient = new RestHighLevelClient (restClientBuilder )) {
267- if (LOG .isDebugEnabled ()) {
268- LOG .debug ("Initialized client" );
269- }
270- boolean exists = false ;
271- try {
272- exists = restHighLevelClient .indices ().open (new OpenIndexRequest (this .index ), RequestOptions .DEFAULT ).isShardsAcknowledged ();
273- } catch (Exception e ) {
274- LOG .warn ("Error validating index " + this .index );
275- }
276- if (exists ) {
277- if (LOG .isDebugEnabled ()) {
278- LOG .debug ("Index exists" );
279- }
280- } else {
281- LOG .info ("Index does not exist" );
282- }
283- return restHighLevelClient ;
262+ RestClientBuilder restClientBuilder = getRestClientBuilder (hosts , protocol , user , password , port );
263+ restHighLevelClient = new RestHighLevelClient (restClientBuilder );
264+ boolean exists = false ;
265+
266+ try {
267+ exists = restHighLevelClient .indices ().open (new OpenIndexRequest (this .index ), RequestOptions .DEFAULT ).isShardsAcknowledged ();
268+ } catch (Exception e ) {
269+ LOG .warn ("Error validating index {}" , this .index );
284270 }
271+
272+ if (exists ) {
273+ LOG .debug ("Index exists" );
274+ } else {
275+ LOG .info ("Index does not exist" );
276+ }
277+
278+ return restHighLevelClient ;
285279 } catch (Throwable t ) {
286280 lastLoggedAt .updateAndGet (lastLoggedAt -> {
287281 long now = System .currentTimeMillis ();
288282 long elapsed = now - lastLoggedAt ;
289283 if (elapsed > TimeUnit .MINUTES .toMillis (1 )) {
290- LOG .error ("Can't connect to ElasticSearch server: " + connectionString (), t );
284+ LOG .error ("Can't connect to ElasticSearch server: {}" , connectionString (), t );
291285 return now ;
292286 } else {
293287 return lastLoggedAt ;
294288 }
295289 });
290+
291+ if (restHighLevelClient != null ) {
292+ try {
293+ restHighLevelClient .close ();
294+ LOG .debug ("Closed RestHighLevelClient after failure" );
295+ } catch (IOException e ) {
296+ LOG .warn ("Error closing RestHighLevelClient: {}" , e .getMessage (), e );
297+ }
298+ }
299+
296300 return null ;
297301 }
298302 }
@@ -346,5 +350,4 @@ Map<String, Object> toDoc(AuthzAuditEvent auditEvent) {
346350 doc .put ("policyVersion" , auditEvent .getPolicyVersion ());
347351 return doc ;
348352 }
349-
350353}
0 commit comments