Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion designer/client/src/actions/nk/validationsActions.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { getProcessName, getProcessProperties } from "../../components/graph/node-modal/NodeDetailsContent/selectors";
import { appendNodeDataToProperties, cleanProperties, isRequestSource } from "../../components/graph/node-modal/requestSourceAddons";
import HttpService from "../../http/HttpService/instance";
import { getScenarioGraph } from "../../reducers/selectors/graph";
import type { NodeType, PropertiesType } from "../../types/node";
import type { NodeValidationError } from "../../types/validation";
import type { NodeValidationError, VariableTypes } from "../../types/validation";
import type { ThunkAction } from "../reduxTypes";
import type { ValidationData, ValidationRequest } from "./nodeDetails";
import { nodeValidationDataUpdated } from "./nodeDetails";
Expand All @@ -24,6 +25,29 @@ export function validateScenarioProperties({ name, additionalFields }: Propertie
};
}

function normalizeBranchVariableTypesForValidation(
nodeData: NodeType,
allBranchVariableTypes: Record<string, VariableTypes>,
scenarioNodes: NodeType[],
): Record<string, VariableTypes> {
if (!nodeData.branchParameters?.length) {
return allBranchVariableTypes || {};
}

const nodeNameById = Object.fromEntries((scenarioNodes || []).map((graphNode) => [graphNode.id, graphNode.name]));
const nodeIdByName = Object.fromEntries((scenarioNodes || []).map((graphNode) => [graphNode.name, graphNode.id]));

return nodeData.branchParameters.reduce<Record<string, VariableTypes>>((acc, branchParameter) => {
const branchId = branchParameter.branchId;
acc[branchId] =
allBranchVariableTypes?.[branchId] ||
allBranchVariableTypes?.[nodeIdByName[branchId]] ||
allBranchVariableTypes?.[nodeNameById[branchId]] ||
{};
return acc;
}, {});
}

export function validateNode({
nodeData,
...validationRequestData
Expand All @@ -38,9 +62,16 @@ export function validateNode({
}

const scenarioName = getProcessName(getState());
const scenarioGraph = getScenarioGraph(getState());
const branchVariableTypes = normalizeBranchVariableTypesForValidation(
nodeData,
validationRequestData.branchVariableTypes || {},
scenarioGraph.nodes || [],
);

const data = await HttpService.validateNode(scenarioName, {
...validationRequestData,
branchVariableTypes,
nodeData,
processProperties,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export default function BranchParameters({
<StyledFieldControl className="fieldsControl">
{node.branchParameters.map((branchParameter, branchIndex) => {
const branchId = branchParameter.branchId;
const canonicalBranchId = nodeNameById[branchId] || branchId;
//here we assume the parameters are correct wrt branch definition. If this is not the case,
//differences should be handled on other level, e.g. using reducers etc.
const paramIndex = branchParameter.parameters.findIndex(
Expand All @@ -67,9 +68,9 @@ export default function BranchParameters({
const paramValue = branchParameter.parameters[paramIndex];
const expressionPath = `branchParameters[${branchIndex}].parameters[${paramIndex}].expression`;

const contextId = ProcessUtils.findContextForBranch(node, branchId);
const contextId = ProcessUtils.findContextForBranch(node, canonicalBranchId);
const variables = findAvailableVariables(contextId, param);
const fieldName = `${paramName} for branch ${branchId}`;
const fieldName = `${paramName} for branch ${canonicalBranchId}`;
const fieldLabel = nodeNameById[branchId] || branchId;
const fieldErrors = getValidationErrorsForField(errors, fieldName).map((error) => {
if (fieldLabel === branchId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { isPlainObject, mapValues } from "lodash";
import type { PropsWithChildren } from "react";
import React, { createContext, memo, useCallback, useContext, useMemo, useReducer } from "react";

Expand All @@ -9,6 +10,21 @@ import { useAppSelector } from "../../../../store/storeHelpers";
import NodeUtils from "../../NodeUtils";
import type { VariableContextType } from "./VariableContextTree";

function mapNodeIdKeysToNames(value: unknown, nodeNameById: Record<string, string>): unknown {
if (Array.isArray(value)) {
return value.map((entry) => mapNodeIdKeysToNames(entry, nodeNameById));
}
if (!isPlainObject(value)) {
return value;
}

return Object.entries(value as Record<string, unknown>).reduce<Record<string, unknown>>((acc, [key, nestedValue]) => {
const normalizedKey = nodeNameById[key] || key;
acc[normalizedKey] = mapNodeIdKeysToNames(nestedValue, nodeNameById);
return acc;
}, {});
}

export type InputOutputState = {
inputDataSetId?: string | null;
outputDataSetId?: string | null;
Expand Down Expand Up @@ -73,6 +89,10 @@ export const InputOutputContextProvider = memo(function InputOutputContextProvid
}>) {
const scenario = useAppSelector(getScenarioGraph);
const testResults = useAppSelector(getTestResults);
const nodeNameById = useMemo<Record<string, string>>(
() => Object.fromEntries((scenario.nodes || []).map((graphNode) => [graphNode.id, graphNode.name])),
[scenario.nodes],
);

const [state, dispatch] = useReducer(reducer, initialState);

Expand Down Expand Up @@ -148,12 +168,16 @@ export const InputOutputContextProvider = memo(function InputOutputContextProvid
existing.nodeIds.push(contextNodeId);
return;
}
const variablesWithMappedNodeNames = mapValues(variables, (entry) => ({
...entry,
pretty: mapNodeIdKeysToNames(entry?.pretty, nodeNameById),
}));

const error = direction === "input" && getError(destinationNodeId, id);

contextMap.set(key, {
id,
variables,
variables: variablesWithMappedNodeNames,
disabled: isContextDisabled(id, direction),
nodeIds: [contextNodeId],
error: error?.throwable,
Expand All @@ -164,7 +188,7 @@ export const InputOutputContextProvider = memo(function InputOutputContextProvid
const count = transitionResults.reduce((sum, { totalCount = 0 }) => sum + totalCount, 0);
return [Array.from(contextMap.values()), count];
},
[inputs, outputs, getError, isContextDisabled],
[inputs, nodeNameById, outputs, getError, isContextDisabled],
);

const value = useMemo<ContextType>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,45 @@ class CanonicalProcessConverterSpec extends AnyFunSuite with Matchers with Table
val canonical = CanonicalProcessConverter.fromScenarioGraph(scenarioGraph, ProcessName("legacy-union"))

canonical.collectAllNodes.collect { case b: BranchEndData => b.definition } should contain only
BranchEndDefinition(sourceUuid, unionUuid)
BranchEndDefinition("source1", unionUuid)
}

test("normalize branch ids from UUID node ids to node names for joins") {
def uuidOf(s: String): String = UUID.nameUUIDFromBytes(s.getBytes(StandardCharsets.UTF_8)).toString

val sourceUuid = uuidOf("source1")
val unionUuid = uuidOf("Union")
val sinkUuid = uuidOf("sink1")

val scenarioGraph = ScenarioGraph(
ProcessProperties(metaData),
List(
Source(NodeId(sourceUuid), NodeName("source1"), SourceRef("sourceRef", List.empty)),
Join(
NodeId(unionUuid),
NodeName("Union"),
Some("out"),
"union",
List.empty,
List(BranchParameters(sourceUuid, List.empty))
),
Sink(NodeId(sinkUuid), NodeName("sink1"), SinkRef("kafka", List.empty))
),
List(
Edge(NodeId(sourceUuid), NodeId(unionUuid), None),
Edge(NodeId(unionUuid), NodeId(sinkUuid), None)
)
)

val canonical = CanonicalProcessConverter.fromScenarioGraph(scenarioGraph, ProcessName("uuid-branch-id-union"))

val joinNode = canonical.collectAllNodes
.collectFirst { case j: Join => j }
.getOrElse(fail("Missing Join node after conversion"))
joinNode.branchParameters.map(_.branchId) should contain only "source1"

canonical.collectAllNodes.collect { case b: BranchEndData => b.definition } should contain only
BranchEndDefinition("source1", unionUuid)
}

test("handle large union scenario fixture with legacy name-based edge references") {
Expand Down Expand Up @@ -244,7 +282,7 @@ class CanonicalProcessConverterSpec extends AnyFunSuite with Matchers with Table

val unionBranchEnd = converted.collectAllNodes
.collectFirst {
case b: BranchEndData if b.definition.id == splitId.value => b
case b: BranchEndData if b.definition.id == "Split" => b
}
.getOrElse(fail("Missing BranchEndData for Split branch after conversion"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,66 @@ class UnionWithMemoTransformerSpec extends AnyFunSuite with FlinkSpec with Match
}
}

test("union with memo should keep non-null values for UUID-like branch ids") {
val BranchFooId = "72e08173-4a3c-45f0-ab26-4d1686a3e483"
val BranchBarId = "15ebb252-2ec9-4829-bcd9-3d18a39c6940"

val process = ScenarioBuilder
.streaming("sample-union-memo")
.sources(
GraphBuilder
.source("start-foo", "start-foo")
.branchEnd(BranchFooId, UnionNodeId.value),
GraphBuilder
.source("start-bar", "start-bar")
.branchEnd(BranchBarId, UnionNodeId.value),
GraphBuilder
.join(
UnionNodeId.value,
"union-memo",
Some(OutVariableName),
List(
BranchFooId -> List(
"key" -> "#input.key".spel,
"value" -> "#input.value".spel
),
BranchBarId -> List(
"key" -> "#input.key".spel,
"value" -> "#input.value".spel
)
),
"stateTimeout" -> s"T(${classOf[Duration].getName}).parse('PT2H')".spel
)
.emptySink(EndNodeId.value, "dead-end")
)

val key = "fooKey"
val sourceFoo = BlockingQueueSource.create[OneRecord](_.timestamp, Duration.ofHours(1))
val sourceBar = BlockingQueueSource.create[OneRecord](_.timestamp, Duration.ofHours(1))

ResultsCollectingListenerHolder.withListener { collectingListener =>
def outValues = {
collectingListener.results
.nodeResults(EndNodeId)
.map(_.variableTyped[jul.Map[String @unchecked, AnyRef @unchecked]](OutVariableName).get.asScala)
}

withProcess(process, sourceFoo, sourceBar, collectingListener) {
sourceFoo.add(OneRecord(key, 0, 123))
eventually {
outValues.last("key") shouldBe key
outValues.last.values should contain(123)
}
sourceBar.add(OneRecord(key, 1, 234))
eventually {
outValues.last("key") shouldBe key
outValues.last.values should contain(123)
outValues.last.values should contain(234)
}
}
}
}

test("union with memo should handle input nodes named \"key\"") {
val BranchFooId = UnionWithMemoTransformer.KeyField
val BranchBarId = "bar"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ class UnionWithMemoTransformer(
val mapTypeInfo = typeInfoDetector
.forType(
Typed.record(
valueByBranchId.mapValuesNow(_.returnType),
valueByBranchId
.map { case (branchId, valueParam) =>
ContextTransformation.sanitizeBranchName(branchId) -> valueParam.returnType
},
Typed.typedClass[java.util.Map[_, _]]
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,34 @@ object CanonicalProcessConverter {
}

def fromScenarioGraph(graph: ScenarioGraph, name: ProcessName): CanonicalProcess = {
val nodesMap = graph.nodes.groupBy(_.id).mapValuesNow(_.head)
val rawNodesMap = graph.nodes.groupBy(_.id).mapValuesNow(_.head)
val nodeIdAliasesByName = graph.nodes.foldLeft(Map.empty[NodeId, NodeId]) { case (acc, node) =>
val alias = NodeId(node.name.value)
if (nodesMap.contains(alias)) acc else acc + (alias -> node.id)
if (rawNodesMap.contains(alias)) acc else acc + (alias -> node.id)
}
def normalizeNodeId(nodeId: NodeId): NodeId = nodeIdAliasesByName.getOrElse(nodeId, nodeId)

def normalizeJoinBranchId(branchId: String): String = {
val normalizedBranchNodeId = normalizeNodeId(NodeId(branchId))
rawNodesMap.get(normalizedBranchNodeId).map(_.name.value).getOrElse(branchId)
}

val nodesMap = graph.nodes
.map {
case join: Join =>
join.copy(
branchParameters = join.branchParameters.map(bp => bp.copy(branchId = normalizeJoinBranchId(bp.branchId)))
)
case other => other
}
.groupBy(_.id)
.mapValuesNow(_.head)

val normalizedEdges =
graph.edges.map(edge => edge.copy(from = normalizeNodeId(edge.from), to = normalizeNodeId(edge.to)))
val edgesFromMapStart = normalizedEdges.groupBy(_.from)
val rootsUnflattened =
findRootNodes(graph).map(headNode => unFlattenNode(nodesMap, None)(headNode, edgesFromMapStart))
findRootNodes(graph).map(headNode => unFlattenNode(nodesMap, None)(nodesMap(headNode.id), edgesFromMapStart))
val nodes = rootsUnflattened.headOption.getOrElse(List.empty)
val additionalBranches = if (rootsUnflattened.isEmpty) List.empty else rootsUnflattened.tail
CanonicalProcess(graph.toMetaData(name), nodes, additionalBranches, graph.stickyNotes, graph.testCases)
Expand Down Expand Up @@ -160,10 +177,8 @@ object CanonicalProcessConverter {
.toMap
canonicalnode.Fragment(data, nexts) :: Nil
case (data: Join, Some(edgeConnectedToJoin)) =>
// We are using "from" node's id as a branchId because for now branchExpressions are inside Join nodes and it is convenient
// way to connect both two things.
val joinId = edgeConnectedToJoin.from
canonicalnode.FlatNode(BranchEndData(BranchEndDefinition(joinId.value, data.id.value))) :: Nil
val branchId = nodeOrThrow(edgeConnectedToJoin.from).name.value
canonicalnode.FlatNode(BranchEndData(BranchEndDefinition(branchId, data.id.value))) :: Nil

}
(handleNestedNodes orElse (handleDirectNodes andThen { n =>
Expand Down