Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/Adaptive.Aeron.Tests/CncFileDescriptorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@ namespace Adaptive.Aeron.Tests
{
class CncFileDescriptorTest
{
private EmbeddedMediaDriver _driver;

[SetUp]
public void StartDriver() => _driver = new EmbeddedMediaDriver();

[TearDown]
public void StopDriver() => _driver?.Dispose();

[Test]
[Ignore("Media driver needs to be running")]
public void ShouldAllocateCapacityForCounterMetadataBuffer()
{
string aeronDir = Aeron.Context.GetAeronDirectoryName();
Expand Down
10 changes: 8 additions & 2 deletions src/Adaptive.Aeron.Tests/ContextText.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ namespace Adaptive.Aeron.Tests
[TestFixture]
public class ContextText
{
private EmbeddedMediaDriver _driver;

[SetUp]
public void StartDriver() => _driver = new EmbeddedMediaDriver();

[TearDown]
public void StopDriver() => _driver?.Dispose();

[Test]
[Ignore("Media driver needs to be running")]
public void ShouldNotAllowConcludeMoreThanOnce()
{
var ctx = new Aeron.Context();
Expand All @@ -16,7 +23,6 @@ public void ShouldNotAllowConcludeMoreThanOnce()
Assert.Throws(typeof(ConcurrentConcludeException), () => ctx.Conclude());
}
[Test]
[Ignore("Media driver needs to be running")]
public void ShouldAllowConcludeOfClonedContext()
{
var ctx = new Aeron.Context();
Expand Down
115 changes: 115 additions & 0 deletions src/Adaptive.Aeron.Tests/EmbeddedMediaDriver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using Adaptive.Agrona.Concurrent;
using NUnit.Framework;

namespace Adaptive.Aeron.Tests
{
internal sealed class EmbeddedMediaDriver : IDisposable
{
private const int StartupTimeoutMs = 15_000;
private const int ShutdownTimeoutMs = 10_000;

private readonly Process _driver;
private readonly string _aeronDir;

public EmbeddedMediaDriver()
{
_aeronDir = Aeron.Context.GetAeronDirectoryName();
if (Directory.Exists(_aeronDir))
{
try { Directory.Delete(_aeronDir, recursive: true); } catch { }
}

var rootDir = GetSolutionDirectory(TestContext.CurrentContext.TestDirectory)?.Parent
?? throw new FileNotFoundException("could not find root directory of project");
var jarPath = Path.Combine(rootDir.FullName, "driver", "media-driver.jar");

var psi = new ProcessStartInfo
{
FileName = "java",
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false
};
psi.ArgumentList.Add("--add-opens");
psi.ArgumentList.Add("java.base/jdk.internal.misc=ALL-UNNAMED");
psi.ArgumentList.Add("--add-opens");
psi.ArgumentList.Add("java.base/java.util.zip=ALL-UNNAMED");
psi.ArgumentList.Add("--add-opens");
psi.ArgumentList.Add("java.base/java.lang.reflect=ALL-UNNAMED");
psi.ArgumentList.Add("--add-opens");
psi.ArgumentList.Add("java.base/sun.nio.ch=ALL-UNNAMED");
psi.ArgumentList.Add("-cp");
psi.ArgumentList.Add(jarPath);
psi.ArgumentList.Add($"-Daeron.dir={_aeronDir}");
psi.ArgumentList.Add("-Daeron.driver.termination.validator=io.aeron.driver.DefaultAllowTerminationValidator");
psi.ArgumentList.Add("io.aeron.driver.MediaDriver");

_driver = Process.Start(psi)
?? throw new InvalidOperationException("failed to start media driver");

WaitForDriverReady();
}

public string AeronDirectoryName => _aeronDir;

public void Dispose()
{
try
{
var token = new UnsafeBuffer(Array.Empty<byte>());
Aeron.Context.RequestDriverTermination(
new DirectoryInfo(_aeronDir), token, 0, 0);
}
catch
{
// fall through to forceful kill
}

if (!_driver.WaitForExit(ShutdownTimeoutMs))
{
try { _driver.Kill(entireProcessTree: true); } catch { }
_driver.WaitForExit(ShutdownTimeoutMs);
}
_driver.Dispose();
}

private static void WaitForDriverReady()
{
var clock = new SystemEpochClock();
var deadline = clock.Time() + StartupTimeoutMs;
Exception last = null;

while (clock.Time() < deadline)
{
try
{
using var aeron = Aeron.Connect();
return;
}
catch (Exception e)
{
last = e;
Thread.Sleep(50);
}
}

throw new TimeoutException(
$"media driver did not become ready within {StartupTimeoutMs}ms", last);
}

private static DirectoryInfo GetSolutionDirectory(string currentPath)
{
var directory = new DirectoryInfo(currentPath ?? Directory.GetCurrentDirectory());
while (directory != null && !directory.GetFiles("*.sln").Any())
{
directory = directory.Parent;
}
return directory;
}
}
}
94 changes: 27 additions & 67 deletions src/Adaptive.Aeron.Tests/SystemTest.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using Adaptive.Aeron.LogBuffer;
Expand All @@ -11,75 +8,49 @@

namespace Adaptive.Aeron.Tests
{
[Ignore("intermittent on CI")]
public class SystemTest
{
private EmbeddedMediaDriver _driver;

[SetUp]
public void StartDriver() => _driver = new EmbeddedMediaDriver();

[TearDown]
public void StopDriver() => _driver?.Dispose();

[Test]
public void BasicMessageTest()
{
var rootDir = GetSolutionDirectory(TestContext.CurrentContext.TestDirectory).Parent;
using var aeron = Aeron.Connect();
var publication = aeron.AddPublication("aeron:ipc", 1);
var subscription = aeron.AddSubscription("aeron:ipc", 1);
Await(() => publication.IsConnected);

if (rootDir == null)
{
throw new FileNotFoundException("could not find root directory of project");
}
var testBytes = Encoding.ASCII.GetBytes("Hello World!");
var buffer = new UnsafeBuffer(testBytes);
Await(() => publication.Offer(buffer, 0, buffer.Capacity) > 0);

var jarPath = Path.Combine(rootDir.ToString(), "driver", "media-driver.jar");
bool messageReceived = false;

var psi = new ProcessStartInfo
void FragmentHandler(IDirectBuffer directBuffer, int offset, int length, Header header)
{
FileName = "java",
Arguments =
$"-cp \"{jarPath}\" -Daeron.driver.termination.validator=io.aeron.driver.DefaultAllowTerminationValidator io.aeron.driver.MediaDriver"
};

var driver = Process.Start(psi);

try
{
using var aeron = Aeron.Connect();
var publication = aeron.AddPublication("aeron:ipc", 1);
var subscription = aeron.AddSubscription("aeron:ipc", 1);
Await(() => publication.IsConnected);

var testBytes = Encoding.ASCII.GetBytes("Hello World!");
var buffer = new UnsafeBuffer(testBytes);
Await(() => publication.Offer(buffer, 0, buffer.Capacity) > 0);

bool messageReceived = false;

void FragmentHandler(IDirectBuffer directBuffer, int offset, int length, Header header)
if ("Hello World!" == directBuffer.GetStringWithoutLengthAscii(offset, length))
{
if ("Hello World!" == directBuffer.GetStringWithoutLengthAscii(offset, length))
{
messageReceived = true;
}
messageReceived = true;
}

Await(() =>
{
subscription.Poll(FragmentHandler, 10);
return messageReceived;
});
}
finally
{
var token = new UnsafeBuffer(Array.Empty<byte>());
Aeron.Context.RequestDriverTermination(
new DirectoryInfo(Aeron.Context.GetAeronDirectoryName()),
token,
0,
0
);

driver.WaitForExit();
}
Await(() =>
{
subscription.Poll(FragmentHandler, 10);
return messageReceived;
});
}

private void Await(Func<bool> predicate)
private static void Await(Func<bool> predicate)
{
var clock = new SystemEpochClock();
var deadline = clock.Time() + 5000L;
var deadline = clock.Time() + 15_000L;

while (!predicate())
{
Expand All @@ -91,16 +62,5 @@ private void Await(Func<bool> predicate)
Thread.Sleep(10);
}
}

private static DirectoryInfo GetSolutionDirectory(string currentPath = null)
{
var directory = new DirectoryInfo(currentPath ?? Directory.GetCurrentDirectory());
while (directory != null && !directory.GetFiles("*.sln").Any())
{
directory = directory.Parent;
}

return directory;
}
}
}
}
Loading