Skip to content

Commit 99d4cea

Browse files
committed
code optimization
1 parent 6a2809a commit 99d4cea

File tree

2 files changed

+27
-14
lines changed

2 files changed

+27
-14
lines changed

linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ import org.apache.commons.lang3.exception.ExceptionUtils
4848

4949
import java.{lang, util}
5050
import java.text.MessageFormat
51+
import java.time.Instant
52+
import java.time.format.DateTimeFormatter
5153
import java.util.concurrent.ConcurrentHashMap
5254
import java.util.concurrent.TimeUnit
5355

@@ -331,19 +333,30 @@ abstract class EntranceServer extends Logging {
331333
).toLong
332334
undoneTask
333335
.filter { job =>
334-
val engineType = LabelUtil.getEngineType(job.getJobRequest.getLabels)
335-
val jobMetrics = Option(job.jobRequest.getMetrics)
336-
val startTime =
337-
if (jobMetrics.exists(_.containsKey(TaskConstant.JOB_RUNNING_TIME))) {
338-
jobMetrics.get.get(TaskConstant.JOB_RUNNING_TIME).toString.toLong
339-
} else {
340-
0L
341-
}
342-
engineType.contains(
343-
EntranceConfiguration.TASK_DIAGNOSIS_ENGINE_TYPE
344-
) && startTime != 0 && startTime < diagnosisTime && !diagnosedJobs.containsKey(
345-
job.getJobRequest.getId.toString
346-
)
336+
try {
337+
val engineType = LabelUtil.getEngineType(job.getJobRequest.getLabels)
338+
val jobMetrics =
339+
Option(JobHistoryHelper.getTaskByTaskID(job.getJobRequest.getId).getMetrics)
340+
val startTime =
341+
if (jobMetrics.exists(_.containsKey(TaskConstant.JOB_RUNNING_TIME))) {
342+
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ")
343+
val instant = Instant.from(
344+
formatter.parse(jobMetrics.get.get(TaskConstant.JOB_RUNNING_TIME).toString)
345+
)
346+
instant.toEpochMilli
347+
} else {
348+
0L
349+
}
350+
engineType.contains(
351+
EntranceConfiguration.TASK_DIAGNOSIS_ENGINE_TYPE
352+
) && startTime != 0 && startTime < diagnosisTime && !diagnosedJobs.containsKey(
353+
job.getJobRequest.getId.toString
354+
)
355+
} catch {
356+
case t: Throwable =>
357+
logger.error(s"Failed to check task for diagnosis, reason: ${t.getMessage}", t)
358+
false
359+
}
347360
}
348361
.foreach { job =>
349362
val jobId = job.getJobRequest.getId

linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ object JobHistoryHelper extends Logging {
266266
tasks
267267
}
268268

269-
private def getTaskByTaskID(taskID: Long): JobRequest = {
269+
def getTaskByTaskID(taskID: Long): JobRequest = {
270270
val jobRequest = new JobRequest
271271
jobRequest.setId(taskID)
272272
jobRequest.setSource(null)

0 commit comments

Comments
 (0)