Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions chasm/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ func (r *Registry) TaskOf(taskGoType reflect.Type) (*RegistrableTask, bool) {
return r.taskOf(taskGoType)
}

func (r *Registry) TaskByID(id uint32) (*RegistrableTask, bool) {
return r.taskByID(id)
}

func (rc RegistrableComponent) FqType() string {
return rc.fqType()
}
Expand Down
5 changes: 5 additions & 0 deletions chasm/registrable_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ func (rc *RegistrableComponent) hasBusinessIDAlias() bool {
return ok
}

// GoType returns the reflect.Type of the component's Go struct.
func (rc *RegistrableComponent) GoType() reflect.Type {
return rc.goType
}

// fqType returns the fully qualified name of the component, which is a combination of
// the library name and the component type. This is used to uniquely identify
// the component in the registry.
Expand Down
5 changes: 5 additions & 0 deletions chasm/registrable_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (rt *RegistrableTask) registerToLibrary(
return fqn, rt.taskTypeID, nil
}

// GoType returns the reflect.Type of the task's Go struct.
func (rt *RegistrableTask) GoType() reflect.Type {
return rt.goType
}

// fqType returns the fully qualified name of the task, which is a combination of
// the library name and the task type. This is used to uniquely identify
// the task in the registry.
Expand Down
5 changes: 5 additions & 0 deletions chasm/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (r *Registry) ComponentIDFor(componentInstance any) (uint32, bool) {
return rc.componentID, true
}

// TaskByID returns the registrable task for a given task type ID.
func (r *Registry) TaskByID(id uint32) (*RegistrableTask, bool) {
return r.taskByID(id)
}

// TaskFqnByID converts task type ID to fully qualified task type name.
// This method should only be used by CHASM framework internal code,
// NOT CHASM library developers.
Expand Down
213 changes: 213 additions & 0 deletions tools/tdbg/chasm_decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package tdbg

import (
"encoding/json"
"errors"
"fmt"
"reflect"

commonpb "go.temporal.io/api/common/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/common/codec"
"go.temporal.io/server/common/persistence/serialization"
"google.golang.org/protobuf/proto"
)

type decodedTask struct {
TypeID uint32 `json:"typeId"`
TaskFQN string `json:"taskFQN,omitempty"`
Destination string `json:"destination,omitempty"`
ScheduledTime string `json:"scheduledTime,omitempty"`
DecodedData json.RawMessage `json:"decodedData,omitempty"`
RawData *commonpb.DataBlob `json:"rawData,omitempty"`
VersionedTransition *persistencespb.VersionedTransition `json:"versionedTransition,omitempty"`
PhysicalTaskStatus int32 `json:"physicalTaskStatus,omitempty"`
}

type decodedChasmNode struct {
Metadata *persistencespb.ChasmNodeMetadata `json:"metadata"`
DecodedData json.RawMessage `json:"decodedData,omitempty"`
RawData *commonpb.DataBlob `json:"rawData,omitempty"`
NodeType string `json:"nodeType"`
ComponentFQN string `json:"componentFQN,omitempty"`
SideEffectTasks []*decodedTask `json:"sideEffectTasks,omitempty"`
PureTasks []*decodedTask `json:"pureTasks,omitempty"`
}

func getNodeType(metadata *persistencespb.ChasmNodeMetadata) string {
switch {
case metadata.GetComponentAttributes() != nil:
return "component"
case metadata.GetCollectionAttributes() != nil:
return "collection"
case metadata.GetDataAttributes() != nil:
return "data"
case metadata.GetPointerAttributes() != nil:
return "pointer"
default:
return "unknown"
}
}

func decodeTask(
task *persistencespb.ChasmComponentAttributes_Task,
registry *chasm.Registry,
) (*decodedTask, error) {
typeID := task.GetTypeId()
fqn, _ := registry.TaskFqnByID(typeID)

var scheduledTime string
if ts := task.GetScheduledTime(); ts != nil {
scheduledTime = ts.AsTime().UTC().Format(defaultDateTimeFormat)
}

decoded := &decodedTask{
TypeID: typeID,
TaskFQN: fqn,
Destination: task.GetDestination(),
ScheduledTime: scheduledTime,
VersionedTransition: task.GetVersionedTransition(),
PhysicalTaskStatus: task.GetPhysicalTaskStatus(),
}

rt, ok := registry.TaskByID(typeID)
if !ok {
decoded.RawData = task.GetData()
return decoded, nil
}

goType := rt.GoType()
if goType == nil {
decoded.RawData = task.GetData()
return decoded, nil
}

messageValue := reflect.New(goType.Elem())
message, ok := messageValue.Interface().(proto.Message)
if !ok {
decoded.RawData = task.GetData()
return decoded, nil
}

dataBlob := task.GetData()
if dataBlob != nil && len(dataBlob.GetData()) > 0 {
message = message.ProtoReflect().New().Interface()
if err := serialization.Decode(dataBlob, message); err != nil {
decoded.RawData = dataBlob
return decoded, nil
}
}

jsonBytes, err := codec.NewJSONPBEncoder().Encode(message)
if err != nil {
decoded.RawData = dataBlob
return decoded, nil
}

decoded.DecodedData = json.RawMessage(jsonBytes)
return decoded, nil
}

func decodeNode(node *persistencespb.ChasmNode, registry *chasm.Registry) (*decodedChasmNode, error) {
metadata := node.GetMetadata()
componentAttr := metadata.GetComponentAttributes()

if componentAttr == nil {
return &decodedChasmNode{
Metadata: metadata,
RawData: node.GetData(),
NodeType: getNodeType(metadata),
}, nil
}

typeID := componentAttr.GetTypeId()
rc, ok := registry.ComponentByID(typeID)
if !ok {
return &decodedChasmNode{
Metadata: metadata,
RawData: node.GetData(),
NodeType: "component (unknown)",
}, nil
}

result := &decodedChasmNode{
Metadata: metadata,
NodeType: getNodeType(metadata),
}

goType := rc.GoType()
if goType == nil {
return nil, errors.New("component has no Go type")
}

messageValue := reflect.New(goType.Elem())
message, ok := messageValue.Interface().(proto.Message)
if !ok {
result.RawData = node.GetData()
result.NodeType = "component (stateless)"
} else {
dataBlob := node.GetData()
if dataBlob != nil && len(dataBlob.GetData()) > 0 {
message = message.ProtoReflect().New().Interface()
if err := serialization.Decode(dataBlob, message); err != nil {
result.RawData = dataBlob
result.NodeType = "component (decode error)"
} else {
jsonBytes, err := codec.NewJSONPBEncoder().Encode(message)
if err != nil {
return nil, fmt.Errorf("failed to encode to JSON: %w", err)
}
result.DecodedData = json.RawMessage(jsonBytes)
result.NodeType = "component"
}
}
}

fqn, _ := registry.ComponentFqnByID(typeID)
result.ComponentFQN = fqn

sideEffectTasks, err := decodeTasks(componentAttr.GetSideEffectTasks(), registry)
if err != nil {
return nil, fmt.Errorf("failed to decode side effect task: %w", err)
}
result.SideEffectTasks = sideEffectTasks

pureTasks, err := decodeTasks(componentAttr.GetPureTasks(), registry)
if err != nil {
return nil, fmt.Errorf("failed to decode pure task: %w", err)
}
result.PureTasks = pureTasks

return result, nil
}

func decodeTasks(
tasks []*persistencespb.ChasmComponentAttributes_Task,
registry *chasm.Registry,
) ([]*decodedTask, error) {
result := make([]*decodedTask, len(tasks))
for i, task := range tasks {
decodedTask, err := decodeTask(task, registry)
if err != nil {
return nil, err
}
result[i] = decodedTask
}
return result, nil
}

func decodeChasmNodes(
chasmNodes map[string]*persistencespb.ChasmNode,
registry *chasm.Registry,
) (map[string]*decodedChasmNode, error) {
decoded := make(map[string]*decodedChasmNode, len(chasmNodes))
for path, node := range chasmNodes {
decodedNode, err := decodeNode(node, registry)
if err != nil {
return nil, fmt.Errorf("failed to decode node at path %q: %w", path, err)
}
decoded[path] = decodedNode
}
return decoded, nil
}
34 changes: 34 additions & 0 deletions tools/tdbg/chasm_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package tdbg

import (
"go.temporal.io/server/chasm"
chasmscheduler "go.temporal.io/server/chasm/lib/scheduler"
chasmtests "go.temporal.io/server/chasm/lib/tests"
chasmworkflow "go.temporal.io/server/chasm/lib/workflow"
"go.temporal.io/server/common/log"
)

func newChasmRegistry(logger log.Logger) (*chasm.Registry, error) {
registry := chasm.NewRegistry(logger)

if err := registry.Register(&chasm.CoreLibrary{}); err != nil {
return nil, err
}

if err := registry.Register(chasmworkflow.NewLibrary()); err != nil {
return nil, err
}

if err := registry.Register(chasmscheduler.NewLibrary(nil, nil, nil, nil, nil, nil)); err != nil {
return nil, err
}

if err := registry.Register(chasmtests.Library); err != nil {
return nil, err
}

// Note: Activity and Callback libraries are not included because their constructors
// are unexported. Add them if/when they're needed.

return registry, nil
}
Loading
Loading