Skip to content

Commit 4af0894

Browse files
authored
Exponentially retry 409s from GoogleIamDAO policy changes, to support concurrent cluster creations in the same project (#737)
1 parent 91d221c commit 4af0894

File tree

3 files changed

+74
-12
lines changed

3 files changed

+74
-12
lines changed

src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/ClusterMonitorActor.scala

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ package org.broadinstitute.dsde.workbench.leonardo.monitor
33
import java.time.Instant
44

55
import akka.actor.Status.Failure
6-
import akka.actor.{Actor, Props}
6+
import akka.actor.{Actor, ActorSystem, Props}
77
import akka.pattern.pipe
88
import cats.data.OptionT
99
import cats.implicits._
10+
import com.google.api.client.googleapis.json.GoogleJsonResponseException
1011
import com.typesafe.scalalogging.LazyLogging
1112
import io.grpc.Status.Code
1213
import org.broadinstitute.dsde.workbench.google.{GoogleIamDAO, GoogleStorageDAO}
@@ -19,7 +20,7 @@ import org.broadinstitute.dsde.workbench.leonardo.model.google.ClusterStatus._
1920
import org.broadinstitute.dsde.workbench.leonardo.model.google.{ClusterStatus, IP, _}
2021
import org.broadinstitute.dsde.workbench.leonardo.monitor.ClusterMonitorActor._
2122
import org.broadinstitute.dsde.workbench.leonardo.monitor.ClusterMonitorSupervisor.{ClusterDeleted, ClusterSupervisorMessage, RemoveFromList}
22-
import org.broadinstitute.dsde.workbench.util.addJitter
23+
import org.broadinstitute.dsde.workbench.util.{Retry, addJitter}
2324
import slick.dbio.DBIOAction
2425

2526
import scala.collection.immutable.Set
@@ -63,9 +64,12 @@ class ClusterMonitorActor(val cluster: Cluster,
6364
val googleStorageDAO: GoogleStorageDAO,
6465
val dbRef: DbReference,
6566
val authProvider: LeoAuthProvider,
66-
val jupyterProxyDAO: JupyterDAO) extends Actor with LazyLogging {
67+
val jupyterProxyDAO: JupyterDAO) extends Actor with LazyLogging with Retry {
6768
import context._
6869

70+
// the Retry trait needs a reference to the ActorSystem
71+
override val system = context.system
72+
6973
override def preStart(): Unit = {
7074
super.preStart()
7175
scheduleInitialMonitorPass
@@ -342,6 +346,13 @@ class ClusterMonitorActor(val cluster: Cluster,
342346
transformed.value
343347
}
344348

349+
private def whenGoogle409(throwable: Throwable): Boolean = {
350+
throwable match {
351+
case t: GoogleJsonResponseException => t.getStatusCode == 409
352+
case _ => false
353+
}
354+
}
355+
345356
private def removeIamRolesForUser: Future[Unit] = {
346357
// Remove the Dataproc Worker IAM role for the cluster service account
347358
cluster.serviceAccountInfo.clusterServiceAccount match {
@@ -356,7 +367,11 @@ class ClusterMonitorActor(val cluster: Cluster,
356367
if (count > 0) {
357368
Future.successful(())
358369
} else {
359-
googleIamDAO.removeIamRolesForUser(cluster.googleProject, serviceAccountEmail, Set("roles/dataproc.worker"))
370+
// Retry 409s with exponential backoff. This can happen if concurrent policy updates are made in the same project.
371+
// Google recommends a retry in this case.
372+
retryExponentially(whenGoogle409, s"IAM policy change failed for Google project '${cluster.googleProject}'") { () =>
373+
googleIamDAO.removeIamRolesForUser(cluster.googleProject, serviceAccountEmail, Set("roles/dataproc.worker"))
374+
}
360375
}
361376
}
362377
}

src/main/scala/org/broadinstitute/dsde/workbench/leonardo/service/LeonardoService.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.broadinstitute.dsde.workbench.leonardo.service
22

3-
import java.io.File
3+
import java.io.{File, IOException}
44
import java.time.Instant
55

66
import akka.actor.ActorSystem
@@ -686,15 +686,32 @@ class LeonardoService(protected val dataprocConfig: DataprocConfig,
686686
tea.value.void
687687
}
688688

689+
private def whenGoogle409(throwable: Throwable): Boolean = {
690+
throwable match {
691+
case t: GoogleJsonResponseException => t.getStatusCode == 409
692+
case _ => false
693+
}
694+
}
695+
689696
private[service] def addDataprocWorkerRoleToServiceAccount(googleProject: GoogleProject, serviceAccountOpt: Option[WorkbenchEmail]): Future[Unit] = {
690697
serviceAccountOpt.map { serviceAccountEmail =>
691-
googleIamDAO.addIamRolesForUser(googleProject, serviceAccountEmail, Set("roles/dataproc.worker"))
698+
// Retry 409s with exponential backoff. This can happen if concurrent policy updates are made in the same project.
699+
// Google recommends a retry in this case.
700+
val iamFuture: Future[Unit] = retryExponentially(whenGoogle409, s"IAM policy change failed for Google project '$googleProject'") { () =>
701+
googleIamDAO.addIamRolesForUser(googleProject, serviceAccountEmail, Set("roles/dataproc.worker"))
702+
}
703+
iamFuture
692704
} getOrElse Future.successful(())
693705
}
694706

695707
private[service] def removeDataprocWorkerRoleFromServiceAccount(googleProject: GoogleProject, serviceAccountOpt: Option[WorkbenchEmail]): Future[Unit] = {
696708
serviceAccountOpt.map { serviceAccountEmail =>
697-
googleIamDAO.removeIamRolesForUser(googleProject, serviceAccountEmail, Set("roles/dataproc.worker"))
709+
// Retry 409s with exponential backoff. This can happen if concurrent policy updates are made in the same project.
710+
// Google recommends a retry in this case.
711+
val iamFuture: Future[Unit] = retryExponentially(whenGoogle409, s"IAM policy change failed for Google project '$googleProject'") { () =>
712+
googleIamDAO.removeIamRolesForUser(googleProject, serviceAccountEmail, Set("roles/dataproc.worker"))
713+
}
714+
iamFuture
698715
} getOrElse Future.successful(())
699716
}
700717

src/test/scala/org/broadinstitute/dsde/workbench/leonardo/service/LeonardoServiceSpec.scala

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import akka.http.scaladsl.model.headers.OAuth2BearerToken
99
import akka.testkit.TestKit
1010
import com.google.api.client.googleapis.testing.json.GoogleJsonResponseExceptionFactoryTesting
1111
import com.google.api.client.testing.json.MockJsonFactory
12+
import com.typesafe.scalalogging.LazyLogging
1213
import org.broadinstitute.dsde.workbench.google.GoogleStorageDAO
1314
import org.broadinstitute.dsde.workbench.google.mock.{MockGoogleDataprocDAO, MockGoogleIamDAO, MockGoogleStorageDAO}
1415
import org.broadinstitute.dsde.workbench.leonardo.CommonTestData
@@ -25,19 +26,21 @@ import org.broadinstitute.dsde.workbench.leonardo.monitor.NoopActor
2526
import org.broadinstitute.dsde.workbench.leonardo.util.BucketHelper
2627
import org.broadinstitute.dsde.workbench.model._
2728
import org.broadinstitute.dsde.workbench.model.google._
29+
import org.broadinstitute.dsde.workbench.util.Retry
2830
import org.mockito.ArgumentMatchers.{any, eq => mockitoEq}
2931
import org.mockito.Mockito.{never, verify, _}
3032
import org.scalatest._
3133
import org.scalatest.concurrent.Eventually.eventually
3234
import org.scalatest.concurrent.ScalaFutures
35+
import org.scalatest.time.{Minutes, Span}
3336
import spray.json._
3437

3538
import scala.concurrent.duration._
3639
import scala.concurrent.{Await, ExecutionContext, Future}
3740

3841
class LeonardoServiceSpec extends TestKit(ActorSystem("leonardotest")) with FlatSpecLike with Matchers
3942
with BeforeAndAfter with BeforeAndAfterAll with TestComponent with ScalaFutures
40-
with OptionValues with CommonTestData with LeoComponent {
43+
with OptionValues with CommonTestData with LeoComponent with Retry with LazyLogging {
4144

4245
private var gdDAO: MockGoogleDataprocDAO = _
4346
private var computeDAO: MockGoogleComputeDAO = _
@@ -975,8 +978,9 @@ class LeonardoServiceSpec extends TestKit(ActorSystem("leonardotest")) with Flat
975978
new MockGoogleStorageDAO
976979
}
977980

978-
//we meed to use a special version of the MockGoogleDataprocDAO to simulate an error during the call to resizeCluster
979-
leo = new LeonardoService(dataprocConfig, clusterFilesConfig, clusterResourcesConfig, clusterDefaultsConfig, proxyConfig, swaggerConfig, autoFreezeConfig, mockGoogleDataprocDAO, computeDAO, new ErroredMockGoogleIamDAO, storageDAO, mockPetGoogleDAO, DbSingleton.ref, authProvider, serviceAccountProvider, whitelist, bucketHelper, contentSecurityPolicy)
981+
//we meed to use a special version of the MockGoogleIamDAO to simulate an error when adding IAM roles
982+
val iamDAO = new ErroredMockGoogleIamDAO
983+
leo = new LeonardoService(dataprocConfig, clusterFilesConfig, clusterResourcesConfig, clusterDefaultsConfig, proxyConfig, swaggerConfig, autoFreezeConfig, mockGoogleDataprocDAO, computeDAO, iamDAO, storageDAO, mockPetGoogleDAO, DbSingleton.ref, authProvider, serviceAccountProvider, whitelist, bucketHelper, contentSecurityPolicy)
980984

981985
// create the cluster
982986
val clusterCreateResponse =
@@ -985,6 +989,30 @@ class LeonardoServiceSpec extends TestKit(ActorSystem("leonardotest")) with Flat
985989
eventually {
986990
dbFutureValue { _.clusterQuery.getClusterStatus(clusterCreateResponse.id) } shouldBe Some(ClusterStatus.Error)
987991
}
992+
993+
// IAM call should not have been retried
994+
iamDAO.invocationCount shouldBe 1
995+
}
996+
997+
it should "retry 409 errors when adding dataproc worker role" in isolatedDbTest {
998+
val mockPetGoogleDAO: String => GoogleStorageDAO = _ => {
999+
new MockGoogleStorageDAO
1000+
}
1001+
1002+
//we meed to use a special version of the MockGoogleIamDAO to simulate a conflict when adding IAM roles
1003+
val iamDAO = new ErroredMockGoogleIamDAO(409)
1004+
leo = new LeonardoService(dataprocConfig, clusterFilesConfig, clusterResourcesConfig, clusterDefaultsConfig, proxyConfig, swaggerConfig, autoFreezeConfig, mockGoogleDataprocDAO, computeDAO, iamDAO, storageDAO, mockPetGoogleDAO, DbSingleton.ref, authProvider, serviceAccountProvider, whitelist, bucketHelper, contentSecurityPolicy)
1005+
1006+
// create the cluster
1007+
val clusterCreateResponse =
1008+
leo.processClusterCreationRequest(userInfo, project, name1, testClusterRequest).futureValue
1009+
1010+
eventually(timeout(Span(5, Minutes))) {
1011+
dbFutureValue { _.clusterQuery.getClusterStatus(clusterCreateResponse.id) } shouldBe Some(ClusterStatus.Error)
1012+
}
1013+
1014+
// IAM call should have been retried exponentially
1015+
iamDAO.invocationCount shouldBe exponentialBackOffIntervals.size + 1
9881016
}
9891017

9901018
it should "update the autopause threshold for a cluster" in isolatedDbTest {
@@ -1158,10 +1186,12 @@ class LeonardoServiceSpec extends TestKit(ActorSystem("leonardotest")) with Flat
11581186
}
11591187
}
11601188

1161-
private class ErroredMockGoogleIamDAO extends MockGoogleIamDAO {
1189+
private class ErroredMockGoogleIamDAO(statusCode: Int = 400) extends MockGoogleIamDAO {
1190+
var invocationCount = 0
11621191
override def addIamRolesForUser(iamProject: GoogleProject, email: WorkbenchEmail, rolesToAdd: Set[String]): Future[Unit] = {
1192+
invocationCount += 1
11631193
val jsonFactory = new MockJsonFactory
1164-
val testException = GoogleJsonResponseExceptionFactoryTesting.newMock(jsonFactory, 400, "oh no i have failed")
1194+
val testException = GoogleJsonResponseExceptionFactoryTesting.newMock(jsonFactory, statusCode, "oh no i have failed")
11651195

11661196
Future.failed(testException)
11671197
}

0 commit comments

Comments
 (0)