Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.MultiNode.TestAdapter;
Expand Down Expand Up @@ -171,13 +172,13 @@ protected ClusterShardingRolePartitioningSpec(ClusterShardingMinMembersPerRoleCo
#endregion

[MultiNodeFact]
public void Cluster_Sharding_with_roles_specs()
public async Task Cluster_Sharding_with_roles_specs()
{
Cluster_Sharding_with_roles_must_start_the_cluster_await_convergence_init_sharding_on_every_node_2_data_types__akka_cluster_min_nr_of_members_2_partition_shard_location_by_2_roles();
Cluster_Sharding_with_roles_must_access_role_R2_nodes_4_5_from_one_of_the_proxy_nodes_1_2_3();
await Cluster_Sharding_with_roles_must_start_the_cluster_await_convergence_init_sharding_on_every_node_2_data_types__akka_cluster_min_nr_of_members_2_partition_shard_location_by_2_roles();
await Cluster_Sharding_with_roles_must_access_role_R2_nodes_4_5_from_one_of_the_proxy_nodes_1_2_3();
}

private void Cluster_Sharding_with_roles_must_start_the_cluster_await_convergence_init_sharding_on_every_node_2_data_types__akka_cluster_min_nr_of_members_2_partition_shard_location_by_2_roles()
private async Task Cluster_Sharding_with_roles_must_start_the_cluster_await_convergence_init_sharding_on_every_node_2_data_types__akka_cluster_min_nr_of_members_2_partition_shard_location_by_2_roles()
{
// start sharding early
StartSharding(
Expand All @@ -197,49 +198,66 @@ private void Cluster_Sharding_with_roles_must_start_the_cluster_await_convergenc
settings: Settings.Value.WithRole("R2"),
messageExtractor: new E2.MessageExtractor());

AwaitClusterUp(Config.First, Config.Second, Config.Third, Config.Fourth, Config.Fifth);
await AwaitClusterUpAsync(default, Config.First, Config.Second, Config.Third, Config.Fourth, Config.Fifth);

RunOn(() =>
await RunOnAsync(async () =>
{
// wait for all regions registered
AwaitAssert(() =>
await AwaitAssertAsync(async () =>
{
var region = ClusterSharding.Get(Sys).ShardRegion(E1.TypeKey);
region.Tell(GetCurrentRegions.Instance);
ExpectMsg<CurrentRegions>().Regions.Count.Should().Be(3);
(await ExpectMsgAsync<CurrentRegions>()).Regions.Count.Should().Be(3);
});
AwaitAssert(() =>
await AwaitAssertAsync(async () =>
{
var region = ClusterSharding.Get(Sys).ShardRegion(E2.TypeKey);
region.Tell(GetCurrentRegions.Instance);
ExpectMsg<CurrentRegions>().Regions.Count.Should().Be(2);
(await ExpectMsgAsync<CurrentRegions>()).Regions.Count.Should().Be(2);
});
}, Config.Fourth);

EnterBarrier($"{Roles.Count}-up");
await EnterBarrierAsync($"{Roles.Count}-up");
}

private void Cluster_Sharding_with_roles_must_access_role_R2_nodes_4_5_from_one_of_the_proxy_nodes_1_2_3()
private async Task Cluster_Sharding_with_roles_must_access_role_R2_nodes_4_5_from_one_of_the_proxy_nodes_1_2_3()
{
RunOn(() =>
await RunOnAsync(async () =>
{
// have first message reach the entity from a proxy with 2 nodes of role R2 and 'min-nr-of-members' set globally versus per role (nodes 4,5, with 1,2,3 proxying)
// RegisterProxy messages from nodes 1,2,3 are deadlettered
// Register messages sent are eventually successful on the fifth node, once coordinator moves to active state
var region = ClusterSharding.Get(Sys).ShardRegion(E2.TypeKey);
foreach (var n in Enumerable.Range(1, 20))

// Use AwaitAssert for the first message to handle coordinator readiness.
// The coordinator may not respond to GetShardHome requests until HasAllRegionsRegistered()
// returns true. This happens when _aliveRegions.Count >= _minMembers. Even though we verified
// regions are registered above, the coordinator's internal state (specifically _allRegionsRegistered)
// may not be set yet, causing GetShardHome requests to be silently ignored and messages to be
// buffered. The ShardRegion only retries GetShardHome on its retry interval (default 2-10s),
// which can exceed our ExpectMsg timeout.
// By wrapping the first message in AwaitAssert, we retry until the coordinator is fully ready.
await AwaitAssertAsync(async () =>
{
region.Tell(1);
await ExpectMsgAsync(1, TimeSpan.FromSeconds(3));
});

// After the first message succeeds, the shard is allocated and subsequent messages
// to the same or new shards should succeed without delay (coordinator is ready)
foreach (var n in Enumerable.Range(2, 19))
{
region.Tell(n);
ExpectMsg(n); // R2 entity received, does not timeout
await ExpectMsgAsync(n); // R2 entity received, does not timeout
}

region.Tell(new GetClusterShardingStats(TimeSpan.FromSeconds(10)));
var stats = ExpectMsg<ClusterShardingStats>();
var stats = await ExpectMsgAsync<ClusterShardingStats>();

stats.Regions.Keys.Should().BeEquivalentTo(fourthAddress, fifthAddress);
stats.Regions.Values.SelectMany(i => i.Stats.Values).Count().Should().Be(20);
}, Config.First);
EnterBarrier("proxy-node-other-role-to-shard");
await EnterBarrierAsync("proxy-node-other-role-to-shard");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,12 @@ await RunOnAsync(async () =>
await EnterBarrierAsync("down-second-node");
}, _config.Second);

// Note: Only run on Third since Second has already been exited
await RunOnAsync(async () =>
{
await EnterBarrierAsync("down-second-node");
await AwaitMembersUpAsync(2, ImmutableHashSet.Create(secondAddress), 30.Seconds());
}, _config.Second, _config.Third);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, looks like this was a bug

}, _config.Third);

await EnterBarrierAsync("await-completion-2");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Cluster.TestKit;
using Akka.Configuration;
using Akka.MultiNode.TestAdapter;
using Akka.Remote.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;

namespace Akka.Cluster.Tests.MultiNode
{
Expand Down Expand Up @@ -47,41 +49,45 @@ protected NodeDowningAndBeingRemovedSpec(NodeDowningAndBeingRemovedSpecSpecConfi
}

[MultiNodeFact]
public void NodeDowningAndBeingRemovedSpecs()
public async Task NodeDowningAndBeingRemovedSpecs()
{
Node_that_is_downed_must_eventually_be_removed_from_membership();
await Node_that_is_downed_must_eventually_be_removed_from_membership();
}

public void Node_that_is_downed_must_eventually_be_removed_from_membership()
public async Task Node_that_is_downed_must_eventually_be_removed_from_membership()
{
AwaitClusterUp(_config.First, _config.Second, _config.Third);
await AwaitClusterUpAsync(CancellationToken.None, _config.First, _config.Second, _config.Third);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need the CancellationToken.None here


Within(TimeSpan.FromSeconds(30), () =>
var secondAddress = GetAddress(_config.Second);
var thirdAddress = GetAddress(_config.Third);

await WithinAsync(45.Seconds(), async () =>
{
RunOn(() =>
await RunOnAsync(() =>
{
Cluster.Down(GetAddress(_config.Second));
Cluster.Down(GetAddress(_config.Third));
Cluster.Down(secondAddress);
Cluster.Down(thirdAddress);
return Task.CompletedTask;
}, _config.First);
EnterBarrier("second-and-third-down");
await EnterBarrierAsync("second-and-third-down");

RunOn(() =>
await RunOnAsync(async () =>
{
// verify that the node is shut down
AwaitCondition(() => Cluster.IsTerminated);
await AwaitConditionAsync(() => Task.FromResult(Cluster.IsTerminated));
}, _config.Second, _config.Third);
EnterBarrier("second-and-third-shutdown");
await EnterBarrierAsync("second-and-third-shutdown");

RunOn(() =>
await RunOnAsync(async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
ClusterView.Members.Select(c => c.Address).Should().NotContain(GetAddress(_config.Second));
ClusterView.Members.Select(c => c.Address).Should().NotContain(GetAddress(_config.Third));
ClusterView.Members.Select(c => c.Address).Should().NotContain(secondAddress);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address caching optimization - just re-using the ones we resolved earlier.

ClusterView.Members.Select(c => c.Address).Should().NotContain(thirdAddress);
});
}, _config.First);

EnterBarrier("finished");
await EnterBarrierAsync("finished");
});

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Cluster.TestKit;
using Akka.Configuration;
using Akka.MultiNode.TestAdapter;
using Akka.Remote.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;

namespace Akka.Cluster.Tests.MultiNode
{
Expand Down Expand Up @@ -46,48 +48,52 @@ protected NodeLeavingAndExitingAndBeingRemovedSpec(NodeLeavingAndExitingAndBeing
}

[MultiNodeFact]
public void NodeLeavingAndExitingAndBeingRemovedSpecs()
public async Task NodeLeavingAndExitingAndBeingRemovedSpecs()
{
Node_that_is_leaving_non_singleton_cluster_eventually_set_to_removed_and_removed_from_membership_ring_and_seen_table();
await Node_that_is_leaving_non_singleton_cluster_eventually_set_to_removed_and_removed_from_membership_ring_and_seen_table();
}

public void Node_that_is_leaving_non_singleton_cluster_eventually_set_to_removed_and_removed_from_membership_ring_and_seen_table()
public async Task Node_that_is_leaving_non_singleton_cluster_eventually_set_to_removed_and_removed_from_membership_ring_and_seen_table()
{
AwaitClusterUp(_config.First, _config.Second, _config.Third);
await AwaitClusterUpAsync(CancellationToken.None, _config.First, _config.Second, _config.Third);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need the CancellationToken.None here


Within(TimeSpan.FromSeconds(15), () =>
var secondAddress = GetAddress(_config.Second);

// Increased timeout from 15s to 45s for CI variability
await WithinAsync(45.Seconds(), async () =>
{
RunOn(() =>
await RunOnAsync(() =>
{
Cluster.Leave(GetAddress(_config.Second));
Cluster.Leave(secondAddress);
return Task.CompletedTask;
}, _config.First);
EnterBarrier("second-left");
await EnterBarrierAsync("second-left");

RunOn(() =>
await RunOnAsync(async () =>
{
EnterBarrier("second-shutdown");
await EnterBarrierAsync("second-shutdown");
// this test verifies that the removal is performed via the ExitingCompleted message,
// otherwise we would have `MarkNodeAsUnavailable(second)` to trigger the FailureDetectorPuppet

// verify that the 'second' node is no longer part of the 'members'/'unreachable' set
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
ClusterView.Members.Select(c => c.Address).Should().NotContain(GetAddress(_config.Second));
ClusterView.Members.Select(c => c.Address).Should().NotContain(secondAddress);
});
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
ClusterView.UnreachableMembers.Select(c => c.Address).Should().NotContain(GetAddress(_config.Second));
ClusterView.UnreachableMembers.Select(c => c.Address).Should().NotContain(secondAddress);
});
}, _config.First, _config.Third);

RunOn(() =>
await RunOnAsync(async () =>
{
// verify that the second node is shut down
AwaitCondition(() => Cluster.IsTerminated);
EnterBarrier("second-shutdown");
await AwaitConditionAsync(() => Task.FromResult(Cluster.IsTerminated));
await EnterBarrierAsync("second-shutdown");
}, _config.Second);

EnterBarrier("finished");
await EnterBarrierAsync("finished");
});

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.TestKit;
using Akka.Configuration;
using Akka.MultiNode.TestAdapter;
Expand Down Expand Up @@ -84,6 +85,11 @@ public async Task A_5_node_cluster_with_keep_one_indirectly_connected_off_should
{
var cluster = Cluster.Get(Sys);

// Set up termination signal using event-driven callback instead of polling
// This must be set up BEFORE the cluster is partitioned
var terminatedTcs = new TaskCompletionSource<Done>(TaskCreationOptions.RunContinuationsAsynchronously);
cluster.RegisterOnMemberRemoved(() => terminatedTcs.TrySetResult(Done.Instance));

RunOn(() =>
{
cluster.Join(cluster.SelfAddress);
Expand All @@ -93,9 +99,9 @@ public async Task A_5_node_cluster_with_keep_one_indirectly_connected_off_should
{
cluster.Join(Node(_config.Node1).Address);
}, _config.Node2, _config.Node3, _config.Node4, _config.Node5);
Within(TimeSpan.FromSeconds(10), () =>
await WithinAsync(TimeSpan.FromSeconds(10), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
cluster.State.Members.Count.Should().Be(5);
foreach (var m in cluster.State.Members)
Expand Down Expand Up @@ -125,9 +131,9 @@ await RunOnAsync(async () =>
}, _config.Node1);
await EnterBarrierAsync("blackholed-indirectly-connected");

Within(TimeSpan.FromSeconds(10), () =>
await WithinAsync(TimeSpan.FromSeconds(10), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
RunOn(() =>
{
Expand All @@ -149,6 +155,7 @@ await RunOnAsync(async () =>
});
await EnterBarrierAsync("unreachable");

// Node1 waits for SBR to complete and verify it's the only surviving member
await RunOnAsync(async () =>
{
await WithinAsync(TimeSpan.FromSeconds(15), async () =>
Expand All @@ -164,10 +171,25 @@ await AwaitAssertAsync(() =>
});
}, _config.Node1);

// Nodes 2,3,4,5 wait for termination using the event-driven callback
await RunOnAsync(async () =>
{
// downed
await AwaitConditionAsync(() => Task.FromResult(cluster.IsTerminated), max: TimeSpan.FromSeconds(15));
// Use event-driven notification via RegisterOnMemberRemoved
// This is more reliable than polling cluster.IsTerminated because:
// 1. The callback fires as soon as the member is removed/shutdown starts
// 2. The callback also fires in PostStop if the cluster daemon is stopping
// 3. No race between polling interval and actual state change
var completed = await Task.WhenAny(
terminatedTcs.Task,
Task.Delay(TimeSpan.FromSeconds(20)));

if (completed != terminatedTcs.Task)
{
// Fallback check - the cluster should definitely be terminated by now
cluster.IsTerminated.Should().BeTrue(
"Cluster should be terminated - either via MemberRemoved callback or shutdown. " +
$"Current self member status: {cluster.SelfMember.Status}");
}
}, _config.Node2, _config.Node3, _config.Node4, _config.Node5);

await EnterBarrierAsync("done");
Expand Down
Loading