Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
Expand Down Expand Up @@ -131,83 +132,94 @@ protected override void AtStartup()
}

[Fact]
public void Sharding_with_remember_entities_enabled_should_allow_a_change_to_the_shard_id_extractor()
public async Task Sharding_with_remember_entities_enabled_should_allow_a_change_to_the_shard_id_extractor()
{
WithSystem("FirstShardIdExtractor", new FirstExtractor(), (system, region) =>
await WithSystemAsync("FirstShardIdExtractor", new FirstExtractor(), async (system, region) =>
{
AssertRegionRegistrationComplete(region);
await AssertRegionRegistrationCompleteAsync(region);
region.Tell(new Message(1));
ExpectMsg("ack");
await ExpectMsgAsync<string>("ack");
region.Tell(new Message(11));
ExpectMsg("ack");
await ExpectMsgAsync<string>("ack");
region.Tell(new Message(21));
ExpectMsg("ack");
await ExpectMsgAsync<string>("ack");

var probe = CreateTestProbe(system);

AwaitAssert(() =>
await AwaitAssertAsync(async () =>
{
region.Tell(GetShardRegionState.Instance, probe.Ref);
var state = probe.ExpectMsg<CurrentShardRegionState>();
var state = await probe.ExpectMsgAsync<CurrentShardRegionState>();
// shards should have been remembered but migrated over to shard 2
state.Shards.Where(s => s.ShardId == "1").SelectMany(i => i.EntityIds).Should().BeEquivalentTo("1", "11", "21");
state.Shards.Where(s => s.ShardId == "2").SelectMany(i => i.EntityIds).Should().BeEmpty();
});
});

WithSystem("SecondShardIdExtractor", new SecondExtractor(), (system, region) =>
await WithSystemAsync("SecondShardIdExtractor", new SecondExtractor(), async (system, region) =>
{
var probe = CreateTestProbe(system);

AwaitAssert(() =>
await AwaitAssertAsync(async () =>
{
region.Tell(GetShardRegionState.Instance, probe.Ref);
var state = probe.ExpectMsg<CurrentShardRegionState>();
var state = await probe.ExpectMsgAsync<CurrentShardRegionState>();
// shards should have been remembered but migrated over to shard 2
state.Shards.Where(s => s.ShardId == "1").SelectMany(i => i.EntityIds).Should().BeEmpty();
state.Shards.Where(s => s.ShardId == "2").SelectMany(i => i.EntityIds).Should().BeEquivalentTo("1", "11", "21");
});
});

WithSystem("ThirdIncarnation", new SecondExtractor(), (system, region) =>
await WithSystemAsync("ThirdIncarnation", new SecondExtractor(), async (system, region) =>
{
var probe = CreateTestProbe(system);
// Only way to verify that they were "normal"-remember-started here is to look at debug logs, will show
// [akka://ThirdIncarnation@127.0.0.1:51533/system/sharding/ShardIdExtractorChange/1/RememberEntitiesStore] Recovery completed for shard [1] with [0] entities
// [akka://ThirdIncarnation@127.0.0.1:51533/system/sharding/ShardIdExtractorChange/2/RememberEntitiesStore] Recovery completed for shard [2] with [3] entities
AwaitAssert(() =>
await AwaitAssertAsync(async () =>
{
region.Tell(GetShardRegionState.Instance, probe.Ref);
var state = probe.ExpectMsg<CurrentShardRegionState>();
var state = await probe.ExpectMsgAsync<CurrentShardRegionState>();
state.Shards.Where(s => s.ShardId == "1").SelectMany(i => i.EntityIds).Should().BeEmpty();
state.Shards.Where(s => s.ShardId == "2").SelectMany(i => i.EntityIds).Should().BeEquivalentTo("1", "11", "21");
});
});
}

private void WithSystem(string systemName, IMessageExtractor extractor, Action<ActorSystem, IActorRef> f)
private async Task WithSystemAsync(string systemName, IMessageExtractor extractor, Func<ActorSystem, IActorRef, Task> f)
{
var system = ActorSystem.Create(systemName, Sys.Settings.Config);
InitializeLogger(system, $"[{systemName}]");
this.SetStore(system, Sys);
Cluster.Get(system).Join(Cluster.Get(system).SelfAddress);

var cluster = Cluster.Get(system);
cluster.Join(cluster.SelfAddress);

// Wait for cluster to form before starting sharding - fixes race condition
// where sharding coordinator singleton may not be elected in time
await AwaitAssertAsync(() =>
{
cluster.ReadView.Members.Count(m => m.Status == MemberStatus.Up).Should().Be(1);
});

try
{
var region = ClusterSharding.Get(system).Start(TypeName, Props.Create(() => new PA()), ClusterShardingSettings.Create(system), extractor);
f(system, region);
await f(system, region);
}
finally
{
system.Terminate().Wait(TimeSpan.FromSeconds(20));
await system.Terminate().WaitAsync(TimeSpan.FromSeconds(20));
}
}

private void AssertRegionRegistrationComplete(IActorRef region)
private async Task AssertRegionRegistrationCompleteAsync(IActorRef region)
{
AwaitAssert(() =>
await AwaitAssertAsync(async () =>
{
region.Tell(GetCurrentRegions.Instance);
ExpectMsg<CurrentRegions>().Regions.Should().HaveCount(1);
var response = await ExpectMsgAsync<CurrentRegions>();
response.Regions.Should().HaveCount(1);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -91,13 +92,18 @@ await AwaitAssertAsync(() =>
// <PushDaemonProxy>
// start the proxy on the proxy system, which runs on a different role not capable of hosting workers
IActorRef proxy = ShardedDaemonProcess.Get(_proxySystem).InitProxy(name, numWorkers, targetRole);

// ping some of the workers via the proxy
for(var i = 0; i < numWorkers; i++)

// Use AwaitAssertAsync to handle proxy registration timing - the ShardRegion proxy
// must register with the coordinator before it can route messages to shards
await AwaitAssertAsync(async () =>
{
var result = await proxy.Ask<int>(i);
result.Should().Be(i);
}
// ping some of the workers via the proxy
for(var i = 0; i < numWorkers; i++)
{
var result = await proxy.Ask<int>(i, TimeSpan.FromSeconds(3));
result.Should().Be(i);
}
});
// </PushDaemonProxy>
}

Expand Down
85 changes: 47 additions & 38 deletions src/core/Akka.Docs.Tests/Streams/StreamTcpDocTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,51 +91,54 @@ public void Simple_server_connection_must_close_incoming_connection()
}

[Fact]
public void Simple_server_must_initial_server_banner_echo_server()
public async Task Simple_server_must_initial_server_banner_echo_server()
{
var connections = Sys.TcpStream().Bind("127.0.0.1", 8888);
var serverProbe = CreateTestProbe();

#region welcome-banner-chat-server
connections.RunForeach(connection =>
{
// server logic, parses incoming commands
var commandParser = Flow.Create<string>().TakeWhile(c => c != "BYE").Select(c => c + "!");

var welcomeMessage = $"Welcome to: {connection.LocalAddress}, you are: {connection.RemoteAddress}!";
var welcome = Source.Single(welcomeMessage);

var serverLogic = Flow.Create<ByteString>()
.Via(Framing.Delimiter(
ByteString.FromString("\n"),
maximumFrameLength: 256,
allowTruncation: true))
.Select(c => c.ToString())
.Select(command =>
{
serverProbe.Tell(command);
return command;
})
.Via(commandParser)
.Merge(welcome)
.Select(c => c + "\n")
.Select(ByteString.FromString);

connection.HandleWith(serverLogic, Materializer);
}, Materializer);
// Use ToMaterialized to capture the binding task so we can await it
var (bindingTask, _) = Sys.TcpStream().Bind("127.0.0.1", 0) // Use port 0 for dynamic port assignment
.ToMaterialized(Sink.ForEach<Tcp.IncomingConnection>(connection =>
{
// server logic, parses incoming commands
var commandParser = Flow.Create<string>().TakeWhile(c => c != "BYE").Select(c => c + "!");

var welcomeMessage = $"Welcome to: {connection.LocalAddress}, you are: {connection.RemoteAddress}!";
var welcome = Source.Single(welcomeMessage);

var serverLogic = Flow.Create<ByteString>()
.Via(Framing.Delimiter(
ByteString.FromString("\n"),
maximumFrameLength: 256,
allowTruncation: true))
.Select(c => c.ToString())
.Select(command =>
{
serverProbe.Tell(command);
return command;
})
.Via(commandParser)
.Merge(welcome)
.Select(c => c + "\n")
.Select(ByteString.FromString);

connection.HandleWith(serverLogic, Materializer);
}), Keep.Both)
.Run(Materializer);
#endregion

// Wait for server to bind before connecting client - fixes race condition
var binding = await bindingTask;
var serverPort = ((System.Net.IPEndPoint)binding.LocalAddress).Port;

var input = new ConcurrentQueue<string>(new[] { "Hello world", "What a lovely day" });

string ReadLine(string prompt) => input.TryDequeue(out var cmd) ? cmd : "q";

{
var connection = Sys.TcpStream().OutgoingConnection("127.0.0.1", 8888);
}

try
{
#region repl-client
var connection = Sys.TcpStream().OutgoingConnection("127.0.0.1", 8888);
var connection = Sys.TcpStream().OutgoingConnection("127.0.0.1", serverPort);

var replParser = Flow.Create<string>().TakeWhile(c => c != "q")
.Concat(Source.Single("BYE"))
Expand All @@ -155,13 +158,19 @@ public void Simple_server_must_initial_server_banner_echo_server()
.Select(_ => ReadLine("> "))
.Via(replParser);

connection.Join(repl).Run(Materializer);
// Client stream runs in background - completion is not awaited
// since we verify behavior via serverProbe
_ = connection.Join(repl).Run(Materializer);
#endregion
}

serverProbe.ExpectMsg("Hello world", TimeSpan.FromSeconds(20));
serverProbe.ExpectMsg("What a lovely day");
serverProbe.ExpectMsg("BYE");
await serverProbe.ExpectMsgAsync<string>(s => s == "Hello world", TimeSpan.FromSeconds(20));
await serverProbe.ExpectMsgAsync<string>(s => s == "What a lovely day");
await serverProbe.ExpectMsgAsync<string>(s => s == "BYE");
}
finally
{
await binding.Unbind();
}
}
}
}
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/HubSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ await this.AssertAllStagesStoppedAsync(async () =>

hubSource.RunWith(Sink.Cancelled<int>(), Materializer);
hubSource.RunWith(Sink.FromSubscriber(downstream), Materializer);

await downstream.EnsureSubscriptionAsync();

await downstream.RequestAsync(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ public ParallelTestActorDeadlockSpec(ITestOutputHelper output)
[Fact(Timeout = 20000)]
public async Task Parallel_TestKit_startup_should_not_deadlock()
{
var concurrentTests = 40; // High parallelism to trigger the issue
// Reduced from 40 to 16 - still tests parallel startup behavior while staying
// within CI agent resource constraints. 40 concurrent TestKits causes shutdown
// cascade where blocking 5-second waits saturate the thread pool.
var concurrentTests = 16;

var tasks = Enumerable.Range(0, concurrentTests)
.Select(_ => Task.Run(RunOneTestKit))
Expand Down
Loading