Skip to content
23 changes: 23 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/AgentResponseEvent.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Agents.AI.Workflows;
Expand All @@ -19,6 +20,28 @@ public AgentResponseEvent(string executorId, AgentResponse response) : base(resp
this.Response = Throw.IfNull(response);
}

/// <summary>
/// Initializes a new instance of the <see cref="AgentResponseEvent"/> class with the given output tag.
/// </summary>
/// <param name="executorId">The identifier of the executor that generated this event.</param>
/// <param name="response">The agent response.</param>
/// <param name="tag">The output tag to associate with this event.</param>
public AgentResponseEvent(string executorId, AgentResponse response, OutputTag tag) : base(response, executorId, tag)
{
this.Response = Throw.IfNull(response);
}

/// <summary>
/// Initializes a new instance of the <see cref="AgentResponseEvent"/> class with the given output tags.
/// </summary>
/// <param name="executorId">The identifier of the executor that generated this event.</param>
/// <param name="response">The agent response.</param>
/// <param name="tags">The output tags to associate with this event. May be <see langword="null"/> or empty.</param>
public AgentResponseEvent(string executorId, AgentResponse response, IEnumerable<OutputTag>? tags) : base(response, executorId, tags)
{
this.Response = Throw.IfNull(response);
}

/// <summary>
/// Gets the agent response.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@ public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update) :
this.Update = Throw.IfNull(update);
}

/// <summary>
/// Initializes a new instance of the <see cref="AgentResponseUpdateEvent"/> class with the given output tag.
/// </summary>
/// <param name="executorId">The identifier of the executor that generated this event.</param>
/// <param name="update">The agent run response update.</param>
/// <param name="tag">The output tag to associate with this event.</param>
public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update, OutputTag tag) : base(update, executorId, tag)
{
this.Update = Throw.IfNull(update);
}

/// <summary>
/// Initializes a new instance of the <see cref="AgentResponseUpdateEvent"/> class with the given output tags.
/// </summary>
/// <param name="executorId">The identifier of the executor that generated this event.</param>
/// <param name="update">The agent run response update.</param>
/// <param name="tags">The output tags to associate with this event. May be <see langword="null"/> or empty.</param>
public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update, IEnumerable<OutputTag>? tags) : base(update, executorId, tags)
{
this.Update = Throw.IfNull(update);
}

/// <summary>
/// Gets the agent run response update.
/// </summary>
Expand Down
93 changes: 35 additions & 58 deletions dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Specialized;
using Microsoft.Extensions.AI;
using Microsoft.Shared.Diagnostics;

Expand Down Expand Up @@ -37,31 +34,10 @@ private static Workflow BuildSequentialCore(string? workflowName, params IEnumer
{
Throw.IfNullOrEmpty(agents);

// Create a builder that chains the agents together in sequence. The workflow simply begins
// with the first agent in the sequence.

AIAgentHostOptions options = new()
{
ReassignOtherAgentsAsUsers = true,
ForwardIncomingMessages = true,
};

List<ExecutorBinding> agentExecutors = agents.Select(agent => agent.BindAsExecutor(options)).ToList();

ExecutorBinding previous = agentExecutors[0];
WorkflowBuilder builder = new(previous);

foreach (ExecutorBinding next in agentExecutors.Skip(1))
{
builder.AddEdge(previous, next);
previous = next;
}

OutputMessagesExecutor end = new();
builder = builder.AddEdge(previous, end).WithOutputFrom(end);
SequentialWorkflowBuilder builder = new(agents);
if (workflowName is not null)
{
builder = builder.WithName(workflowName);
builder.WithName(workflowName);
}
return builder.Build();
}
Expand Down Expand Up @@ -107,41 +83,14 @@ private static Workflow BuildConcurrentCore(
{
Throw.IfNull(agents);

// A workflow needs a starting executor, so we create one that forwards everything to each agent.
ChatForwardingExecutor start = new("Start");
WorkflowBuilder builder = new(start);

// For each agent, we create an executor to host it and an accumulator to batch up its output messages,
// so that the final accumulator receives a single list of messages from each agent. Otherwise, the
// accumulator would not be able to determine what came from what agent, as there's currently no
// provenance tracking exposed in the workflow context passed to a handler.

ExecutorBinding[] agentExecutors = (from agent in agents
select agent.BindAsExecutor(new AIAgentHostOptions() { ReassignOtherAgentsAsUsers = true })).ToArray();
ExecutorBinding[] accumulators = [.. from agent in agentExecutors select (ExecutorBinding)new AggregateTurnMessagesExecutor($"Batcher/{agent.Id}")];
builder.AddFanOutEdge(start, agentExecutors);

for (int i = 0; i < agentExecutors.Length; i++)
ConcurrentWorkflowBuilder builder = new(agents);
if (workflowName is not null)
{
builder.AddEdge(agentExecutors[i], accumulators[i]);
builder.WithName(workflowName);
}

// Create the accumulating executor that will gather the results from each agent, and connect
// each agent's accumulator to it. If no aggregation function was provided, we default to returning
// the last message from each agent
aggregator ??= static lists => (from list in lists where list.Count > 0 select list.Last()).ToList();

Func<string, string, ValueTask<ConcurrentEndExecutor>> endFactory =
(_, __) => new(new ConcurrentEndExecutor(agentExecutors.Length, aggregator));

ExecutorBinding end = endFactory.BindExecutor(ConcurrentEndExecutor.ExecutorId);

builder.AddFanInBarrierEdge(accumulators, end);

builder = builder.WithOutputFrom(end);
if (workflowName is not null)
if (aggregator is not null)
{
builder = builder.WithName(workflowName);
builder.WithAggregator(aggregator);
}
return builder.Build();
}
Expand Down Expand Up @@ -179,4 +128,32 @@ public static GroupChatWorkflowBuilder CreateGroupChatBuilderWith(Func<IReadOnly
Throw.IfNull(managerFactory);
return new GroupChatWorkflowBuilder(managerFactory);
}

/// <summary>Creates a new <see cref="SequentialWorkflowBuilder"/> with the given pipeline of <paramref name="agents"/>.</summary>
/// <param name="agents">The sequence of agents to compose into a sequential workflow.</param>
/// <returns>The builder for creating a sequential workflow.</returns>
public static SequentialWorkflowBuilder CreateSequentialBuilderWith(params IEnumerable<AIAgent> agents)
{
Throw.IfNull(agents);
return new SequentialWorkflowBuilder(agents);
}

/// <summary>Creates a new <see cref="ConcurrentWorkflowBuilder"/> with the given participating <paramref name="agents"/>.</summary>
/// <param name="agents">The set of agents to compose into a concurrent workflow.</param>
/// <returns>The builder for creating a concurrent workflow.</returns>
public static ConcurrentWorkflowBuilder CreateConcurrentBuilderWith(params IEnumerable<AIAgent> agents)
{
Throw.IfNull(agents);
return new ConcurrentWorkflowBuilder(agents);
}

/// <summary>Creates a new <see cref="MagenticWorkflowBuilder"/> with the given <paramref name="managerAgent"/>.</summary>
/// <param name="managerAgent">The LLM-powered manager agent that coordinates the team.</param>
/// <returns>The builder for creating a Magentic workflow.</returns>
[Experimental(DiagnosticConstants.ExperimentalFeatureDiagnostic)]
public static MagenticWorkflowBuilder CreateMagenticBuilderWith(AIAgent managerAgent)
{
Throw.IfNull(managerAgent);
return new MagenticWorkflowBuilder(managerAgent);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json.Serialization;
Expand All @@ -15,14 +16,14 @@ internal WorkflowInfo(
Dictionary<string, List<EdgeInfo>> edges,
HashSet<RequestPortInfo> requestPorts,
string startExecutorId,
HashSet<string>? outputExecutorIds)
Dictionary<string, HashSet<OutputTag>>? outputExecutorIds)
{
this.Executors = Throw.IfNull(executors);
this.Edges = Throw.IfNull(edges);
this.RequestPorts = Throw.IfNull(requestPorts);

this.StartExecutorId = Throw.IfNullOrEmpty(startExecutorId);
this.OutputExecutorIds = outputExecutorIds ?? [];
this.OutputExecutorIds = outputExecutorIds ?? new Dictionary<string, HashSet<OutputTag>>(StringComparer.Ordinal);
}

public Dictionary<string, ExecutorInfo> Executors { get; }
Expand All @@ -32,7 +33,15 @@ internal WorkflowInfo(
public TypeId? InputType { get; }
public string StartExecutorId { get; }

public HashSet<string> OutputExecutorIds { get; }
/// <summary>
/// Map of executor id to the set of <see cref="OutputTag"/>s under which the executor is registered.
/// An empty set means the executor is registered as a regular (untagged) output source.
/// JSON shape: <c>{ "executorId": ["intermediate"], ... }</c>. Legacy payloads using the
/// older <c>string[]</c> shape are read by <see cref="WorkflowInfoOutputExecutorsConverter"/> and
/// each id is treated as registered with an empty tag set.
/// </summary>
[JsonConverter(typeof(WorkflowInfoOutputExecutorsConverter))]
public Dictionary<string, HashSet<OutputTag>> OutputExecutorIds { get; }

public bool IsMatch(Workflow workflow)
{
Expand Down Expand Up @@ -80,9 +89,12 @@ public bool IsMatch(Workflow workflow)
return false;
}

// Validate the outputs
// Validate the outputs (key set + tag set per id must match)
if (workflow.OutputExecutors.Count != this.OutputExecutorIds.Count ||
this.OutputExecutorIds.Any(id => !workflow.OutputExecutors.Contains(id)))
this.OutputExecutorIds.Any(kvp =>
!workflow.OutputExecutors.TryGetValue(kvp.Key, out HashSet<OutputTag>? tags) ||
tags.Count != kvp.Value.Count ||
!tags.SetEquals(kvp.Value)))
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Microsoft.Agents.AI.Workflows.Checkpointing;

/// <summary>
/// JSON converter for <see cref="WorkflowInfo.OutputExecutorIds"/> that supports both the new
/// map shape (<c>{ "id": ["intermediate"] }</c>) and the legacy array shape
/// (<c>["id1", "id2"]</c>). Legacy-shaped payloads are read as if every id had been registered
/// as a regular (untagged) output source; output is always written in the new map shape.
/// </summary>
internal sealed class WorkflowInfoOutputExecutorsConverter : JsonConverter<Dictionary<string, HashSet<OutputTag>>>
{
public override Dictionary<string, HashSet<OutputTag>> Read(
ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
Dictionary<string, HashSet<OutputTag>> result = new(StringComparer.Ordinal);

if (reader.TokenType == JsonTokenType.Null)
{
return result;
}

if (reader.TokenType == JsonTokenType.StartArray)
{
// Legacy shape: a flat array of executor ids. Treat each as a registered
// (untagged) output executor.
while (reader.Read())
{
if (reader.TokenType == JsonTokenType.EndArray)
{
return result;
}

if (reader.TokenType != JsonTokenType.String)
{
throw new JsonException($"Expected a string in legacy outputExecutorIds array, got {reader.TokenType}.");
}

string id = reader.GetString()!;
result[id] = [];
}

throw new JsonException("Unexpected end of legacy outputExecutorIds array.");
}

if (reader.TokenType != JsonTokenType.StartObject)
{
throw new JsonException($"Expected object or array for outputExecutorIds, got {reader.TokenType}.");
}

while (reader.Read())
{
if (reader.TokenType == JsonTokenType.EndObject)
{
return result;
}

if (reader.TokenType != JsonTokenType.PropertyName)
{
throw new JsonException($"Expected property name in outputExecutorIds object, got {reader.TokenType}.");
}

string id = reader.GetString()!;
reader.Read();

HashSet<OutputTag> tags = [];
if (reader.TokenType == JsonTokenType.StartArray)
{
while (reader.Read() && reader.TokenType != JsonTokenType.EndArray)
{
if (reader.TokenType != JsonTokenType.String)
{
throw new JsonException($"Expected a string tag, got {reader.TokenType}.");
}

tags.Add(ReadTag(reader.GetString()!));
}
}
else
{
throw new JsonException($"Expected array of tags for outputExecutorIds[{id}], got {reader.TokenType}.");
}

result[id] = tags;
}

throw new JsonException("Unexpected end of outputExecutorIds object.");
}

private static OutputTag ReadTag(string value)
{
if (string.Equals(value, OutputTag.Intermediate.Value, StringComparison.Ordinal))
{
return OutputTag.Intermediate;
}
return new OutputTag(value);
}

public override void Write(
Utf8JsonWriter writer,
Dictionary<string, HashSet<OutputTag>> value,
JsonSerializerOptions options)
{
writer.WriteStartObject();
foreach (KeyValuePair<string, HashSet<OutputTag>> kvp in value)
{
writer.WritePropertyName(kvp.Key);
writer.WriteStartArray();
foreach (OutputTag tag in kvp.Value)
{
writer.WriteStringValue(tag.Value);
}
writer.WriteEndArray();
}
writer.WriteEndObject();
}
}
Loading
Loading