< Summary

Information
Class: Orchestrator.Commands.Observability.Experiments.PreparedExperimentRunExecutor
Assembly: Orchestrator
File(s): /home/runner/work/KicktippAi/KicktippAi/src/Orchestrator/Commands/Observability/Experiments/PreparedExperimentRunExecutor.cs
Line coverage
90%
Covered lines: 755
Uncovered lines: 79
Coverable lines: 834
Total lines: 1275
Line coverage: 90.5%
Branch coverage
63%
Covered branches: 150
Total branches: 236
Branch coverage: 63.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
.ctor(...)100%11100%
ExecuteAsync()76.92%262696.35%
ExecuteCommunityToDateAsync()80.77%262697.39%
BuildBatches(...)75%6450%
CreateRepeatedMatchSliceBatches(...)93.75%161694.44%
ResolvePromptRouteAsync()20%611020%
ApplyBatchingDefaults(...)62.5%88100%
ExecuteItemAsync()57.69%272690.09%
ExecuteCommunityItemAsync()43.75%161697.14%
PostRunScoresAsync()100%11100%
PostItemScoreAsync()50%22100%
WaitForDatasetRunItemsAsync()50%4473.33%
DeleteExistingRunIfRequestedAsync()50%2275%
LoadOutcomeDictionary(...)100%22100%
LoadOutcomesAsync()100%88100%
ConfigureTraceContext(...)87.5%88100%
SetTraceAndRootObservationInput(...)50%4480%
SetExperimentItemExpectedOutput(...)50%4475%
SetExperimentItemMetadata(...)50%4475%
SetTraceAndRootObservationOutput(...)50%4480%
SetExperimentRunId(...)50%4475%
CreateExperimentItemInputJson(...)100%11100%
CreateExperimentItemExpectedOutputJson(...)100%11100%
CreateExperimentItemMetadataJson(...)100%11100%
SelectParticipants(...)50%621643.75%
BuildCommunityRunMetadata(...)35.71%1414100%
BuildCommunityRunFamilyName(...)0%620%
BuildCommunityParticipantRunName(...)100%11100%
BuildRunTimestampToken(...)100%210%
CreateCommunityPredictionPayload(...)100%11100%
CreatePredictionOrNull(...)100%44100%
ConfigureCommunityPredictionObservation(...)75%4495.24%
DeriveDatasetName(...)25%44100%
GetCommunityContext(...)25%44100%
BuildOutcomeKey(...)100%11100%
IsWarmupBatchTask(...)100%22100%
DescribeBatching(...)75%4480%
.ctor(...)100%11100%
.ctor(...)100%11100%

File(s)

/home/runner/work/KicktippAi/KicktippAi/src/Orchestrator/Commands/Observability/Experiments/PreparedExperimentRunExecutor.cs

#LineLine coverage
 1using System.Diagnostics;
 2using System.Text.Json;
 3using EHonda.KicktippAi.Core;
 4using OpenAiIntegration;
 5using Orchestrator.Infrastructure.Factories;
 6using Orchestrator.Infrastructure.Langfuse;
 7using Match = EHonda.KicktippAi.Core.Match;
 8
 9namespace Orchestrator.Commands.Observability.Experiments;
 10
 11internal sealed class PreparedExperimentRunExecutor
 12{
 113    private static readonly JsonSerializerOptions TraceJsonOptions = new(JsonSerializerDefaults.Web)
 114    {
 115        PropertyNameCaseInsensitive = true,
 116        PropertyNamingPolicy = JsonNamingPolicy.CamelCase
 117    };
 18
 19    private readonly IFirebaseServiceFactory _firebaseServiceFactory;
 20    private readonly IOpenAiServiceFactory _openAiServiceFactory;
 21    private readonly ILangfusePublicApiClient _langfuseClient;
 22
 123    public PreparedExperimentRunExecutor(
 124        IFirebaseServiceFactory firebaseServiceFactory,
 125        IOpenAiServiceFactory openAiServiceFactory,
 126        ILangfusePublicApiClient langfuseClient)
 27    {
 128        _firebaseServiceFactory = firebaseServiceFactory;
 129        _openAiServiceFactory = openAiServiceFactory;
 130        _langfuseClient = langfuseClient;
 131    }
 32
 33    public async Task<PreparedExperimentRunSummary> ExecuteAsync(
 34        string expectedTaskType,
 35        PreparedExperimentRunRequest request,
 36        CancellationToken cancellationToken)
 37    {
 138        var manifest = await PreparedExperimentCommandSupport.LoadJsonFileAsync<PreparedExperimentManifest>(
 139            request.ManifestPath,
 140            cancellationToken);
 141        PreparedExperimentCommandSupport.ValidateManifest(manifest);
 142        PreparedExperimentCommandSupport.EnsureTaskType(manifest, expectedTaskType);
 43
 144        var runMetadata = string.IsNullOrWhiteSpace(request.RunMetadataFile)
 145            ? PreparedExperimentSupport.BuildRunMetadata(manifest, request.Options)
 146            : PreparedExperimentCommandSupport.NormalizeRunMetadata(
 147                await PreparedExperimentCommandSupport.LoadJsonFileAsync<PreparedExperimentRunMetadata>(
 148                    request.RunMetadataFile,
 149                    cancellationToken),
 150                manifest,
 151                request.Options);
 152        runMetadata = ApplyBatchingDefaults(runMetadata, expectedTaskType);
 53
 154        var communityContext = GetCommunityContext(runMetadata, manifest);
 155        var datasetName = DeriveDatasetName(runMetadata, manifest);
 156        var explicitEvaluationTime = PreparedExperimentCommandSupport.ParseExplicitEvaluationTime(runMetadata);
 157        var evaluationTimestampPolicy = explicitEvaluationTime is null
 158            ? PreparedExperimentCommandSupport.ParseEvaluationTimestampPolicy(runMetadata)
 159            : null;
 160        var deletedExistingRun = await DeleteExistingRunIfRequestedAsync(
 161            datasetName,
 162            request.RunName,
 163            request.ReplaceRun,
 164            cancellationToken);
 65
 166        var predictionRepository = _firebaseServiceFactory.CreatePredictionRepository();
 167        var contextRepository = _firebaseServiceFactory.CreateContextRepository();
 168        var matchOutcomeRepository = _firebaseServiceFactory.CreateMatchOutcomeRepository();
 169        var promptRoute = await ResolvePromptRouteAsync(runMetadata, cancellationToken);
 170        if (promptRoute.TraceMetadata is { } promptTraceMetadata)
 71        {
 072            runMetadata = runMetadata with
 073            {
 074                LangfusePromptVersion = promptTraceMetadata.Version
 075            };
 76        }
 77
 178        var predictionServiceOptions = PredictionServiceOptions.FlexProcessingWithStandardFallback with
 179        {
 180            LangfusePromptTraceMetadata = promptRoute.TraceMetadata,
 181            ReasoningEffort = runMetadata.ReasoningEffort,
 182            MaxOutputTokenCount = runMetadata.MaxOutputTokenCount
 183                                  ?? PredictionServiceOptions.FlexProcessingWithStandardFallback.MaxOutputTokenCount
 184        };
 185        var predictionService = promptRoute.TemplateProvider is null
 186            ? _openAiServiceFactory.CreatePredictionService(
 187                request.Options.Model,
 188                predictionServiceOptions)
 189            : _openAiServiceFactory.CreatePredictionService(
 190                request.Options.Model,
 191                predictionServiceOptions,
 192                promptRoute.TemplateProvider);
 193        var reconstructionService = new MatchPromptReconstructionService(
 194            predictionRepository,
 195            contextRepository,
 196            promptRoute.TemplateProvider ?? new InstructionsTemplateProvider(PromptsFileProvider.Create()));
 97
 198        var outcomesByKey = await LoadOutcomesAsync(matchOutcomeRepository, communityContext, manifest, cancellationToke
 199        var experimentName = PreparedExperimentSupport.DeriveExperimentName(runMetadata, request.RunName);
 1100        var traceTags = PreparedExperimentSupport.DeriveTraceTags(runMetadata);
 1101        var propagatedMetadata = PreparedExperimentSupport.DerivePropagatedMetadata(runMetadata);
 1102        var runMetadataPayload = PreparedExperimentSupport.BuildLangfuseExperimentMetadata(
 1103            runMetadata,
 1104            experimentName,
 1105            request.RunName,
 1106            new Dictionary<string, string?>
 1107            {
 1108                ["openaiServiceTierStrategy"] = "flex-first-standard-fallback",
 1109                ["openaiReasoningEffort"] = runMetadata.ReasoningEffort
 1110            });
 1111        var batches = BuildBatches(manifest.Items, runMetadata, expectedTaskType);
 1112        var executionSummaries = new List<PreparedExperimentExecutionSummary>();
 1113        string? datasetRunId = null;
 1114        var completedExecutionCount = 0;
 115
 1116        PreparedExperimentSupport.ReportProgress(
 1117            $"Starting {expectedTaskType} run '{request.RunName}' for model '{request.Options.Model}' with sample size {
 118
 1119        for (var batchIndex = 0; batchIndex < batches.Count; batchIndex += 1)
 120        {
 1121            var batch = batches[batchIndex];
 1122            var batchStart = completedExecutionCount + 1;
 1123            var batchEnd = completedExecutionCount + batch.Count;
 124
 1125            PreparedExperimentSupport.ReportProgress(
 1126                $"Batch {batchIndex + 1}/{batches.Count}: executions {batchStart}-{batchEnd} of {manifest.Items.Count}."
 127
 1128            var batchResults = await Task.WhenAll(batch.Select(item => ExecuteItemAsync(
 1129                item,
 1130                request,
 1131                experimentName,
 1132                datasetName,
 1133                runMetadata,
 1134                explicitEvaluationTime,
 1135                evaluationTimestampPolicy,
 1136                predictionRepository,
 1137                reconstructionService,
 1138                predictionService,
 1139                outcomesByKey,
 1140                traceTags,
 1141                propagatedMetadata,
 1142                runMetadataPayload,
 1143                cancellationToken)));
 144
 1145            foreach (var batchResult in batchResults)
 146            {
 1147                datasetRunId ??= batchResult.DatasetRunId;
 1148                executionSummaries.Add(batchResult.Summary);
 149            }
 150
 1151            completedExecutionCount += batchResults.Length;
 1152            PreparedExperimentSupport.ReportProgress(
 1153                $"Completed batch {batchIndex + 1}/{batches.Count}: {completedExecutionCount}/{manifest.Items.Count} exe
 154        }
 155
 1156        if (string.IsNullOrWhiteSpace(datasetRunId))
 157        {
 0158            throw new InvalidOperationException($"Dataset run '{request.RunName}' did not return a datasetRunId.");
 159        }
 160
 1161        var aggregateScores = await PostRunScoresAsync(datasetRunId, runMetadata, executionSummaries, cancellationToken)
 1162        var datasetRun = await _langfuseClient.GetDatasetRunAsync(datasetName, request.RunName, cancellationToken)
 1163            ?? throw new InvalidOperationException(
 1164                $"Dataset run '{request.RunName}' could not be retrieved from dataset '{datasetName}'.");
 1165        var datasetRunItems = await WaitForDatasetRunItemsAsync(
 1166            datasetRun.DatasetId,
 1167            request.RunName,
 1168            manifest.Items.Count,
 1169            cancellationToken);
 170
 1171        return new PreparedExperimentRunSummary(
 1172            datasetName,
 1173            request.RunName,
 1174            request.RunName,
 1175            runMetadata.TaskType ?? expectedTaskType,
 1176            request.Options.Model,
 1177            deletedExistingRun,
 1178            manifest.Items.Count,
 1179            runMetadata.BatchStrategy ?? expectedTaskType,
 1180            runMetadata.BatchSize,
 1181            runMetadata.BatchCount,
 1182            runMetadata.Parallelism,
 1183            executionSummaries.Count,
 1184            1,
 1185            aggregateScores,
 1186            [new PreparedExperimentDatasetRunSummary(
 1187                1,
 1188                request.RunName,
 1189                datasetRunId,
 1190                datasetRunItems.Meta.TotalItems,
 1191                aggregateScores,
 1192                executionSummaries.FirstOrDefault(),
 1193                executionSummaries.LastOrDefault())],
 1194            executionSummaries.FirstOrDefault(),
 1195            executionSummaries.LastOrDefault());
 1196    }
 197
 198    public async Task<PreparedExperimentRunSummary> ExecuteCommunityToDateAsync(
 199        PreparedExperimentCommunityRunRequest request,
 200        CancellationToken cancellationToken)
 201    {
 1202        var manifest = await PreparedExperimentCommandSupport.LoadJsonFileAsync<PreparedExperimentManifest>(
 1203            request.ManifestPath,
 1204            cancellationToken);
 1205        PreparedExperimentCommandSupport.ValidateManifest(manifest);
 1206        PreparedExperimentCommandSupport.EnsureTaskType(manifest, "community-to-date");
 207
 1208        if (manifest.Participants.Count == 0)
 209        {
 0210            throw new InvalidOperationException("Community-to-date manifests must contain at least one participant.");
 211        }
 212
 1213        var datasetName = string.IsNullOrWhiteSpace(request.DatasetName)
 1214            ? manifest.SliceDatasetName
 1215            : request.DatasetName.Trim();
 1216        if (string.IsNullOrWhiteSpace(datasetName))
 217        {
 0218            throw new InvalidOperationException("No dataset name was provided for the community-to-date run.");
 219        }
 220
 1221        var startedAtUtc = ExperimentArtifactSupport.FormatStartedAtUtc(DateTimeOffset.UtcNow);
 1222        var batchSize = request.BatchSize;
 1223        var participants = SelectParticipants(manifest, request);
 1224        var runFamilyName = string.IsNullOrWhiteSpace(request.RunFamilyName)
 1225            ? BuildCommunityRunFamilyName(manifest, startedAtUtc)
 1226            : request.RunFamilyName.Trim();
 227
 1228        var datasetRunSummaries = new List<PreparedExperimentDatasetRunSummary>();
 1229        var executionSummaries = new List<PreparedExperimentExecutionSummary>();
 1230        var scoreEntries = new List<ExperimentItemScores>();
 1231        var deletedAnyExistingRun = false;
 232
 1233        PreparedExperimentSupport.ReportProgress(
 1234            $"Starting community-to-date run family '{runFamilyName}' with {participants.Count} participant run(s) and s
 235
 1236        for (var participantIndex = 0; participantIndex < participants.Count; participantIndex += 1)
 237        {
 1238            var participant = participants[participantIndex];
 1239            var runName = BuildCommunityParticipantRunName(runFamilyName, participant);
 1240            var runMetadata = BuildCommunityRunMetadata(manifest, participant, datasetName, startedAtUtc, batchSize);
 1241            var deletedExistingRun = await DeleteExistingRunIfRequestedAsync(
 1242                datasetName,
 1243                runName,
 1244                request.ReplaceRuns,
 1245                cancellationToken);
 1246            deletedAnyExistingRun |= deletedExistingRun;
 247
 1248            var traceTags = PreparedExperimentSupport.DeriveTraceTags(runMetadata);
 1249            var propagatedMetadata = PreparedExperimentSupport.DerivePropagatedMetadata(runMetadata);
 1250            var experimentName = runFamilyName;
 1251            var runMetadataPayload = PreparedExperimentSupport.BuildLangfuseExperimentMetadata(
 1252                runMetadata,
 1253                experimentName,
 1254                runName);
 1255            var predictionsBySourceDatasetItemId = participant.Predictions
 1256                .GroupBy(prediction => prediction.SourceDatasetItemId, StringComparer.Ordinal)
 1257                .ToDictionary(group => group.Key, group => group.First(), StringComparer.Ordinal);
 1258            var batches = PreparedExperimentSupport.CreateBatchChunks(manifest.Items, batchSize);
 1259            var participantScoreEntries = new List<ExperimentItemScores>();
 1260            var participantExecutionSummaries = new List<PreparedExperimentExecutionSummary>();
 1261            string? datasetRunId = null;
 1262            var completedExecutionCount = 0;
 263
 1264            PreparedExperimentSupport.ReportProgress(
 1265                $"Participant {participantIndex + 1}/{participants.Count}: starting run '{runName}' for '{participant.Di
 266
 1267            for (var batchIndex = 0; batchIndex < batches.Count; batchIndex += 1)
 268            {
 1269                var batch = batches[batchIndex];
 1270                var batchStart = completedExecutionCount + 1;
 1271                var batchEnd = completedExecutionCount + batch.Count;
 272
 1273                PreparedExperimentSupport.ReportProgress(
 1274                    $"Participant {participant.DisplayName}: batch {batchIndex + 1}/{batches.Count}, executions {batchSt
 275
 1276                var batchResults = await Task.WhenAll(batch.Select(item => ExecuteCommunityItemAsync(
 1277                    item,
 1278                    participant,
 1279                    predictionsBySourceDatasetItemId,
 1280                    runName,
 1281                    experimentName,
 1282                    request.RunDescription,
 1283                    datasetName,
 1284                    runMetadata,
 1285                    traceTags,
 1286                    propagatedMetadata,
 1287                    runMetadataPayload,
 1288                    cancellationToken)));
 289
 1290                foreach (var batchResult in batchResults)
 291                {
 1292                    datasetRunId ??= batchResult.DatasetRunId;
 1293                    participantScoreEntries.Add(batchResult.Summary.Scores);
 1294                    participantExecutionSummaries.Add(batchResult.Summary);
 295                }
 296
 1297                completedExecutionCount += batchResults.Length;
 298            }
 299
 1300            if (string.IsNullOrWhiteSpace(datasetRunId))
 301            {
 0302                throw new InvalidOperationException($"Dataset run '{runName}' did not return a datasetRunId.");
 303            }
 304
 1305            var aggregateScores = await PostRunScoresAsync(datasetRunId, runMetadata, participantExecutionSummaries, can
 1306            var datasetRun = await _langfuseClient.GetDatasetRunAsync(datasetName, runName, cancellationToken)
 1307                ?? throw new InvalidOperationException(
 1308                    $"Dataset run '{runName}' could not be retrieved from dataset '{datasetName}'.");
 1309            var datasetRunItems = await WaitForDatasetRunItemsAsync(
 1310                datasetRun.DatasetId,
 1311                runName,
 1312                manifest.Items.Count,
 1313                cancellationToken);
 314
 1315            datasetRunSummaries.Add(new PreparedExperimentDatasetRunSummary(
 1316                participantIndex + 1,
 1317                runName,
 1318                datasetRunId,
 1319                datasetRunItems.Meta.TotalItems,
 1320                aggregateScores,
 1321                participantExecutionSummaries.FirstOrDefault(),
 1322                participantExecutionSummaries.LastOrDefault()));
 1323            executionSummaries.AddRange(participantExecutionSummaries);
 1324            scoreEntries.AddRange(participantScoreEntries);
 1325        }
 326
 1327        var overallAggregateScores = PreparedExperimentSupport.SummarizeScores(scoreEntries);
 1328        return new PreparedExperimentRunSummary(
 1329            datasetName,
 1330            runFamilyName,
 1331            runFamilyName,
 1332            "community-to-date",
 1333            "community-predictions",
 1334            deletedAnyExistingRun,
 1335            manifest.Items.Count,
 1336            "simple-batched",
 1337            batchSize,
 1338            null,
 1339            null,
 1340            executionSummaries.Count,
 1341            datasetRunSummaries.Count,
 1342            overallAggregateScores,
 1343            datasetRunSummaries,
 1344            executionSummaries.FirstOrDefault(),
 1345            executionSummaries.LastOrDefault());
 1346    }
 347
 348    private IReadOnlyList<IReadOnlyList<PreparedExperimentManifestItem>> BuildBatches(
 349        IReadOnlyList<PreparedExperimentManifestItem> items,
 350        PreparedExperimentRunMetadata runMetadata,
 351        string expectedTaskType)
 352    {
 1353        if (string.Equals(expectedTaskType, "repeated-match-slice", StringComparison.OrdinalIgnoreCase))
 354        {
 0355            return CreateRepeatedMatchSliceBatches(
 0356                items,
 0357                runMetadata.BatchCount ?? 3,
 0358                runMetadata.Parallelism ?? 5);
 359        }
 360
 1361        return IsWarmupBatchTask(expectedTaskType)
 1362            ? PreparedExperimentSupport.CreateWarmupThenBatchChunks(items, runMetadata.BatchCount ?? 3)
 1363            : PreparedExperimentSupport.CreateBatchChunks(items, runMetadata.BatchSize ?? 10);
 364    }
 365
 366    internal static IReadOnlyList<IReadOnlyList<PreparedExperimentManifestItem>> CreateRepeatedMatchSliceBatches(
 367        IReadOnlyList<PreparedExperimentManifestItem> items,
 368        int batchCount,
 369        int parallelism)
 370    {
 1371        if (parallelism < 1)
 372        {
 0373            throw new ArgumentOutOfRangeException(nameof(parallelism), parallelism, "Parallelism must be at least 1.");
 374        }
 375
 1376        var fixtureWorkflows = items
 1377            .GroupBy(item => item.SourceDatasetItemId, StringComparer.Ordinal)
 1378            .OrderBy(group => group.Min(item => item.FixtureIndex ?? int.MaxValue))
 1379            .ThenBy(group => group.Key, StringComparer.Ordinal)
 1380            .Select(group =>
 1381                PreparedExperimentSupport.CreateWarmupThenBatchChunks(
 1382                    group
 1383                        .OrderBy(item => item.RepetitionIndex ?? int.MaxValue)
 1384                        .ThenBy(item => item.SliceDatasetItemId, StringComparer.Ordinal)
 1385                        .ToList(),
 1386                    batchCount))
 1387            .ToList();
 388
 1389        var batches = new List<IReadOnlyList<PreparedExperimentManifestItem>>();
 1390        for (var workflowStart = 0; workflowStart < fixtureWorkflows.Count; workflowStart += parallelism)
 391        {
 1392            var workflowGroup = fixtureWorkflows
 1393                .Skip(workflowStart)
 1394                .Take(parallelism)
 1395                .ToList();
 1396            var maxWorkflowBatchCount = workflowGroup.Max(workflow => workflow.Count);
 1397            for (var workflowBatchIndex = 0; workflowBatchIndex < maxWorkflowBatchCount; workflowBatchIndex += 1)
 398            {
 1399                var batch = workflowGroup
 1400                    .Where(workflow => workflowBatchIndex < workflow.Count)
 1401                    .SelectMany(workflow => workflow[workflowBatchIndex])
 1402                    .ToList();
 1403                if (batch.Count > 0)
 404                {
 1405                    batches.Add(batch);
 406                }
 407            }
 408        }
 409
 1410        return batches;
 411    }
 412
 413    private async Task<ExperimentPromptRoute> ResolvePromptRouteAsync(
 414        PreparedExperimentRunMetadata runMetadata,
 415        CancellationToken cancellationToken)
 416    {
 1417        var promptSource = string.IsNullOrWhiteSpace(runMetadata.PromptSource)
 1418            ? "local"
 1419            : runMetadata.PromptSource.Trim().ToLowerInvariant();
 420
 1421        if (promptSource == "local")
 422        {
 1423            return new ExperimentPromptRoute(null, null);
 424        }
 425
 0426        if (promptSource != "langfuse")
 427        {
 0428            throw new InvalidOperationException($"Unsupported prompt source '{runMetadata.PromptSource}'.");
 429        }
 430
 0431        if (runMetadata.IncludeJustification)
 432        {
 0433            throw new InvalidOperationException(
 0434                "The Langfuse prompt source POC only supports match prompts without justification.");
 435        }
 436
 0437        if (string.IsNullOrWhiteSpace(runMetadata.LangfusePromptName))
 438        {
 0439            throw new InvalidOperationException("Run metadata must contain langfusePromptName when promptSource is langf
 440        }
 441
 0442        var prompt = await _langfuseClient.GetPromptAsync(
 0443                         runMetadata.LangfusePromptName,
 0444                         runMetadata.LangfusePromptLabel,
 0445                         runMetadata.LangfusePromptVersion,
 0446                         cancellationToken)
 0447                     ?? throw new FileNotFoundException(
 0448                         $"Langfuse prompt '{runMetadata.LangfusePromptName}' was not found.");
 449
 0450        _ = prompt.GetTextPrompt();
 0451        var templateProvider = new LangfuseTextPromptTemplateProvider(
 0452            _langfuseClient,
 0453            runMetadata.LangfusePromptName,
 0454            runMetadata.LangfusePromptLabel,
 0455            runMetadata.LangfusePromptVersion,
 0456            prompt);
 457
 0458        return new ExperimentPromptRoute(
 0459            templateProvider,
 0460            new LangfusePromptTraceMetadata(prompt.Name, prompt.Version));
 1461    }
 462
 463    private static PreparedExperimentRunMetadata ApplyBatchingDefaults(
 464        PreparedExperimentRunMetadata runMetadata,
 465        string expectedTaskType)
 466    {
 1467        if (IsWarmupBatchTask(expectedTaskType))
 468        {
 1469            return runMetadata with
 1470            {
 1471                BatchStrategy = string.IsNullOrWhiteSpace(runMetadata.BatchStrategy)
 1472                    ? "warmup-plus-batches"
 1473                    : runMetadata.BatchStrategy,
 1474                BatchCount = runMetadata.BatchCount ?? 3,
 1475                BatchSize = null,
 1476                Parallelism = string.Equals(expectedTaskType, "repeated-match-slice", StringComparison.OrdinalIgnoreCase
 1477                    ? runMetadata.Parallelism ?? 5
 1478                    : runMetadata.Parallelism
 1479            };
 480        }
 481
 1482        return runMetadata with
 1483        {
 1484            BatchStrategy = string.IsNullOrWhiteSpace(runMetadata.BatchStrategy)
 1485                ? "simple-batched"
 1486                : runMetadata.BatchStrategy,
 1487            BatchSize = runMetadata.BatchSize ?? 10,
 1488            BatchCount = null,
 1489            Parallelism = null
 1490        };
 491    }
 492
 493    private async Task<PreparedExperimentExecutionResult> ExecuteItemAsync(
 494        PreparedExperimentManifestItem item,
 495        PreparedExperimentRunRequest request,
 496        string experimentName,
 497        string datasetName,
 498        PreparedExperimentRunMetadata runMetadata,
 499        DateTimeOffset? explicitEvaluationTime,
 500        EvaluationTimestampPolicy? evaluationTimestampPolicy,
 501        IPredictionRepository predictionRepository,
 502        MatchPromptReconstructionService reconstructionService,
 503        IPredictionService predictionService,
 504        IReadOnlyDictionary<string, PersistedMatchOutcome> outcomesByKey,
 505        IReadOnlyList<string> traceTags,
 506        IReadOnlyDictionary<string, string> propagatedMetadata,
 507        JsonElement runMetadataPayload,
 508        CancellationToken cancellationToken)
 509    {
 1510        var outcomeKey = BuildOutcomeKey(item.HomeTeam, item.AwayTeam, item.Matchday);
 1511        if (!outcomesByKey.TryGetValue(outcomeKey, out var outcome))
 512        {
 0513            throw new InvalidOperationException(
 0514                $"No persisted match outcome was found for {item.HomeTeam} vs {item.AwayTeam} on matchday {item.Matchday
 515        }
 516
 1517        if (!outcome.HasOutcome || outcome.HomeGoals is null || outcome.AwayGoals is null)
 518        {
 0519            throw new InvalidOperationException(
 0520                $"The selected match does not have a completed persisted outcome yet: {item.HomeTeam} vs {item.AwayTeam}
 521        }
 522
 1523        var storedMatch = await predictionRepository.GetStoredMatchAsync(
 1524            item.HomeTeam,
 1525            item.AwayTeam,
 1526            item.Matchday,
 1527            (PredictionModelConfig?)null,
 1528            null,
 1529            cancellationToken);
 530
 1531        var promptMatch = storedMatch is null
 1532            ? ExperimentArtifactSupport.RehydrateForPromptOutput(new Match(item.HomeTeam, item.AwayTeam, outcome.StartsA
 1533            : ExperimentArtifactSupport.RehydrateForPromptOutput(storedMatch);
 1534        var evaluationTimestamp = explicitEvaluationTime
 1535            ?? EvaluationTimestampResolver.Resolve(
 1536                promptMatch,
 1537                evaluationTimestampPolicy ?? throw new InvalidOperationException(
 1538                    "Run metadata must contain either evaluationTime or evaluationTimestampPolicy."));
 1539        var selection = MatchContextDocumentCatalog.ForMatch(item.HomeTeam, item.AwayTeam, runMetadata.CommunityContext!
 1540        var reconstructedPrompt = await reconstructionService.ReconstructMatchPredictionPromptAtTimestampAsync(
 1541            promptMatch,
 1542            request.Options.Model,
 1543            runMetadata.CommunityContext!,
 1544            evaluationTimestamp,
 1545            selection.RequiredDocumentNames,
 1546            selection.OptionalDocumentNames,
 1547            runMetadata.IncludeJustification,
 1548            cancellationToken);
 549
 1550        var contextDocuments = reconstructedPrompt.ResolvedContextDocuments
 1551            .Select(document => new DocumentContext(document.DocumentName, document.Content))
 1552            .ToList();
 1553        var telemetryMetadata = new PredictionTelemetryMetadata(
 1554            HomeTeam: item.HomeTeam,
 1555            AwayTeam: item.AwayTeam,
 1556            RepredictionIndex: 0);
 557
 1558        using var activity = Telemetry.Source.StartActivity("experiment-item-run");
 1559        ConfigureTraceContext(
 1560            activity,
 1561            request.RunName,
 1562            experimentName,
 1563            request.RunDescription,
 1564            datasetName,
 1565            runMetadata,
 1566            item,
 1567            outcome.TippSpielId,
 1568            traceTags,
 1569            propagatedMetadata,
 1570            evaluationTimestamp,
 1571            predictionService.GetMatchPromptPath(runMetadata.IncludeJustification));
 572
 1573        SetExperimentItemMetadata(activity, CreateExperimentItemMetadataJson(item));
 1574        SetExperimentItemExpectedOutput(
 1575            activity,
 1576            CreateExperimentItemExpectedOutputJson(outcome.HomeGoals.Value, outcome.AwayGoals.Value));
 1577        SetTraceAndRootObservationInput(activity, CreateExperimentItemInputJson(item));
 578
 1579        var traceId = activity?.TraceId.ToString();
 1580        if (string.IsNullOrWhiteSpace(traceId))
 581        {
 0582            throw new InvalidOperationException(
 0583                $"Trace creation failed for {item.HomeTeam} vs {item.AwayTeam}; no trace id was available.");
 584        }
 585
 1586        var datasetRunItem = await _langfuseClient.CreateDatasetRunItemAsync(
 1587            new LangfuseCreateDatasetRunItemRequest(
 1588                request.RunName,
 1589                item.SliceDatasetItemId,
 1590                traceId,
 1591                request.RunDescription,
 1592                runMetadataPayload,
 1593                activity?.SpanId.ToString()),
 1594            cancellationToken);
 1595        SetExperimentRunId(activity, datasetRunItem.DatasetRunId);
 596
 1597        var prediction = await predictionService.PredictMatchAsync(
 1598            promptMatch,
 1599            contextDocuments,
 1600            runMetadata.IncludeJustification,
 1601            telemetryMetadata,
 1602            cancellationToken);
 603
 1604        if (prediction is null)
 605        {
 0606            SetTraceAndRootObservationOutput(
 0607                activity,
 0608                JsonSerializer.Serialize(new { error = "Failed to generate prediction" }, TraceJsonOptions));
 0609            throw new InvalidOperationException(
 0610                $"Failed to generate prediction for {item.HomeTeam} vs {item.AwayTeam} on matchday {item.Matchday}.");
 611        }
 612
 1613        SetTraceAndRootObservationOutput(activity, JsonSerializer.Serialize(prediction, TraceJsonOptions));
 614
 1615        var itemScores = PreparedExperimentSupport.CalculateScores(prediction, outcome.HomeGoals.Value, outcome.AwayGoal
 1616        await PostItemScoreAsync(
 1617            datasetRunItem.DatasetRunId,
 1618            datasetName,
 1619            request.RunName,
 1620            experimentName,
 1621            runMetadata,
 1622            item,
 1623            traceId,
 1624            activity?.SpanId.ToString(),
 1625            itemScores,
 1626            cancellationToken);
 627
 1628        return new PreparedExperimentExecutionResult(
 1629            datasetRunItem.DatasetRunId,
 1630            new PreparedExperimentExecutionSummary(
 1631                item.SliceDatasetItemId,
 1632                item.SourceDatasetItemId,
 1633                request.RunName,
 1634                traceId,
 1635                prediction,
 1636                itemScores,
 1637                traceTags,
 1638                null,
 1639                "placed",
 1640                item.FixtureIndex,
 1641                item.RepetitionIndex));
 1642    }
 643
 644    private async Task<PreparedExperimentExecutionResult> ExecuteCommunityItemAsync(
 645        PreparedExperimentManifestItem item,
 646        PreparedExperimentParticipantManifest participant,
 647        IReadOnlyDictionary<string, PreparedExperimentParticipantPrediction> predictionsBySourceDatasetItemId,
 648        string runName,
 649        string experimentName,
 650        string? runDescription,
 651        string datasetName,
 652        PreparedExperimentRunMetadata runMetadata,
 653        IReadOnlyList<string> traceTags,
 654        IReadOnlyDictionary<string, string> propagatedMetadata,
 655        JsonElement runMetadataPayload,
 656        CancellationToken cancellationToken)
 657    {
 1658        var participantPrediction = predictionsBySourceDatasetItemId.TryGetValue(item.SourceDatasetItemId, out var predi
 1659            ? prediction
 1660            : new PreparedExperimentParticipantPrediction
 1661            {
 1662                SourceDatasetItemId = item.SourceDatasetItemId,
 1663                Status = "missed",
 1664                KicktippPoints = 0
 1665            };
 666
 1667        var predictionPayload = CreateCommunityPredictionPayload(participantPrediction);
 668
 1669        using var activity = Telemetry.Source.StartActivity("experiment-item-run");
 1670        ConfigureTraceContext(
 1671            activity,
 1672            runName,
 1673            experimentName,
 1674            runDescription,
 1675            datasetName,
 1676            runMetadata,
 1677            item,
 1678            item.TippSpielId,
 1679            traceTags,
 1680            propagatedMetadata);
 681
 1682        SetExperimentItemMetadata(activity, CreateExperimentItemMetadataJson(item));
 1683        SetTraceAndRootObservationInput(activity, CreateExperimentItemInputJson(item));
 1684        SetTraceAndRootObservationOutput(activity, predictionPayload.GetRawText());
 685
 1686        var traceId = activity?.TraceId.ToString();
 1687        if (string.IsNullOrWhiteSpace(traceId))
 688        {
 0689            throw new InvalidOperationException(
 0690                $"Trace creation failed for {item.HomeTeam} vs {item.AwayTeam}; no trace id was available.");
 691        }
 692
 1693        var datasetRunItem = await _langfuseClient.CreateDatasetRunItemAsync(
 1694            new LangfuseCreateDatasetRunItemRequest(
 1695                runName,
 1696                item.SliceDatasetItemId,
 1697                traceId,
 1698                runDescription,
 1699                runMetadataPayload,
 1700                activity?.SpanId.ToString()),
 1701            cancellationToken);
 1702        SetExperimentRunId(activity, datasetRunItem.DatasetRunId);
 703
 1704        string? predictionObservationId = null;
 1705        using (var observation = Telemetry.Source.StartActivity(runMetadata.ObservationName ?? "community-match-predicti
 706        {
 1707            predictionObservationId = observation?.SpanId.ToString();
 1708            ConfigureCommunityPredictionObservation(observation, participant, participantPrediction, item, predictionPay
 1709        }
 710
 1711        var itemScores = new ExperimentItemScores(participantPrediction.KicktippPoints);
 1712        await PostItemScoreAsync(
 1713            datasetRunItem.DatasetRunId,
 1714            datasetName,
 1715            runName,
 1716            experimentName,
 1717            runMetadata,
 1718            item,
 1719            traceId,
 1720            predictionObservationId ?? activity?.SpanId.ToString(),
 1721            itemScores,
 1722            cancellationToken);
 723
 1724        return new PreparedExperimentExecutionResult(
 1725            datasetRunItem.DatasetRunId,
 1726            new PreparedExperimentExecutionSummary(
 1727                item.SliceDatasetItemId,
 1728                item.SourceDatasetItemId,
 1729                runName,
 1730                traceId,
 1731                CreatePredictionOrNull(participantPrediction),
 1732                itemScores,
 1733                traceTags,
 1734                null,
 1735                participantPrediction.Status,
 1736                item.FixtureIndex,
 1737                item.RepetitionIndex));
 1738    }
 739
 740    private async Task<ExperimentAggregateScores> PostRunScoresAsync(
 741        string datasetRunId,
 742        PreparedExperimentRunMetadata runMetadata,
 743        IReadOnlyList<PreparedExperimentExecutionSummary> executionSummaries,
 744        CancellationToken cancellationToken)
 745    {
 1746        var aggregateScores = PreparedExperimentSupport.SummarizeExecutionScores(
 1747            executionSummaries,
 1748            runMetadata.TaskType);
 1749        var runMetadataPayload = JsonSerializer.SerializeToElement(runMetadata, PreparedExperimentCommandSupport.JsonOpt
 750
 1751        await _langfuseClient.CreateScoreAsync(
 1752            new LangfuseCreateScoreRequest(
 1753                "total_kicktipp_points",
 1754                aggregateScores.TotalKicktippPoints,
 1755                DatasetRunId: datasetRunId,
 1756                Comment: $"Aggregate score for {runMetadata.SampleSize} item(s)",
 1757                Id: PreparedExperimentSupport.CreateScoreId("total_kicktipp_points", datasetRunId),
 1758                Metadata: runMetadataPayload,
 1759                Environment: "sdk-experiment"),
 1760            cancellationToken);
 761
 1762        await _langfuseClient.CreateScoreAsync(
 1763            new LangfuseCreateScoreRequest(
 1764                "avg_kicktipp_points",
 1765                aggregateScores.AvgKicktippPoints,
 1766                DatasetRunId: datasetRunId,
 1767                Comment: $"Aggregate score for {runMetadata.SampleSize} item(s)",
 1768                Id: PreparedExperimentSupport.CreateScoreId("avg_kicktipp_points", datasetRunId),
 1769                Metadata: runMetadataPayload,
 1770                Environment: "sdk-experiment"),
 1771            cancellationToken);
 772
 1773        return aggregateScores;
 1774    }
 775
 776    private async Task PostItemScoreAsync(
 777        string datasetRunId,
 778        string datasetName,
 779        string runName,
 780        string experimentName,
 781        PreparedExperimentRunMetadata runMetadata,
 782        PreparedExperimentManifestItem item,
 783        string traceId,
 784        string? observationId,
 785        ExperimentItemScores itemScores,
 786        CancellationToken cancellationToken)
 787    {
 1788        var metadata = JsonSerializer.SerializeToElement(new
 1789        {
 1790            datasetRunId,
 1791            datasetRunName = runName,
 1792            datasetName,
 1793            datasetItemId = item.SliceDatasetItemId,
 1794            sourceDatasetItemId = item.SourceDatasetItemId,
 1795            experiment_name = experimentName,
 1796            experiment_run_name = runName,
 1797            task = runMetadata.TaskType,
 1798            runSubjectKind = runMetadata.RunSubjectKind,
 1799            runSubjectId = runMetadata.RunSubjectId,
 1800            runSubjectDisplayName = runMetadata.RunSubjectDisplayName,
 1801            reasoningEffort = runMetadata.ReasoningEffort,
 1802            item.HomeTeam,
 1803            item.AwayTeam,
 1804            item.Matchday,
 1805            item.TippSpielId,
 1806            item.FixtureIndex,
 1807            item.RepetitionIndex
 1808        }, PreparedExperimentCommandSupport.JsonOptions);
 809
 1810        await _langfuseClient.CreateScoreAsync(
 1811            new LangfuseCreateScoreRequest(
 1812                "kicktipp_points",
 1813                itemScores.KicktippPoints,
 1814                TraceId: traceId,
 1815                ObservationId: string.IsNullOrWhiteSpace(observationId) ? null : observationId,
 1816                DataType: "NUMERIC",
 1817                Comment: $"Item score for {item.HomeTeam} vs {item.AwayTeam}",
 1818                Id: PreparedExperimentSupport.CreateScoreId(
 1819                    "kicktipp_points",
 1820                    datasetRunId,
 1821                    traceId,
 1822                    observationId,
 1823                    item.SliceDatasetItemId),
 1824                Metadata: metadata,
 1825                Environment: "sdk-experiment"),
 1826            cancellationToken);
 1827    }
 828
 829    private async Task<LangfusePaginatedResponse<LangfuseDatasetRunItem>> WaitForDatasetRunItemsAsync(
 830        string datasetId,
 831        string runName,
 832        int expectedCount,
 833        CancellationToken cancellationToken)
 834    {
 1835        var limit = Math.Min(100, Math.Max(1, expectedCount));
 836
 1837        for (var attempt = 0; attempt < 6; attempt += 1)
 838        {
 1839            var datasetRunItems = await _langfuseClient.ListDatasetRunItemsAsync(
 1840                datasetId,
 1841                runName,
 1842                1,
 1843                limit,
 1844                cancellationToken);
 845
 1846            if (datasetRunItems.Meta.TotalItems >= expectedCount)
 847            {
 1848                return datasetRunItems;
 849            }
 850
 0851            PreparedExperimentSupport.ReportProgress(
 0852                $"Waiting for Langfuse dataset run items for '{runName}': {datasetRunItems.Meta.TotalItems}/{expectedCou
 0853            await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);
 854        }
 855
 0856        return await _langfuseClient.ListDatasetRunItemsAsync(datasetId, runName, 1, limit, cancellationToken);
 1857    }
 858
 859    private async Task<bool> DeleteExistingRunIfRequestedAsync(
 860        string datasetName,
 861        string runName,
 862        bool replaceRun,
 863        CancellationToken cancellationToken)
 864    {
 1865        if (!replaceRun)
 866        {
 1867            return false;
 868        }
 869
 0870        return await _langfuseClient.DeleteDatasetRunAsync(datasetName, runName, cancellationToken);
 1871    }
 872
 873    private static IReadOnlyDictionary<string, PersistedMatchOutcome> LoadOutcomeDictionary(
 874        IEnumerable<PersistedMatchOutcome> outcomes)
 875    {
 1876        return outcomes.ToDictionary(
 1877            outcome => BuildOutcomeKey(outcome.HomeTeam, outcome.AwayTeam, outcome.Matchday),
 1878            StringComparer.OrdinalIgnoreCase);
 879    }
 880
 881    private static async Task<IReadOnlyDictionary<string, PersistedMatchOutcome>> LoadOutcomesAsync(
 882        IMatchOutcomeRepository matchOutcomeRepository,
 883        string communityContext,
 884        PreparedExperimentManifest manifest,
 885        CancellationToken cancellationToken)
 886    {
 1887        var dictionary = new Dictionary<string, PersistedMatchOutcome>(StringComparer.OrdinalIgnoreCase);
 888
 1889        foreach (var matchday in manifest.Items.Select(item => item.Matchday).Distinct().OrderBy(matchday => matchday))
 890        {
 1891            var outcomes = await matchOutcomeRepository.GetMatchdayOutcomesAsync(matchday, communityContext, cancellatio
 1892            foreach (var pair in LoadOutcomeDictionary(outcomes))
 893            {
 1894                dictionary[pair.Key] = pair.Value;
 895            }
 896        }
 897
 1898        return dictionary;
 1899    }
 900
 901    private static void ConfigureTraceContext(
 902        Activity? activity,
 903        string runName,
 904        string experimentName,
 905        string? runDescription,
 906        string datasetName,
 907        PreparedExperimentRunMetadata runMetadata,
 908        PreparedExperimentManifestItem item,
 909        string? tippSpielId,
 910        IReadOnlyList<string> traceTags,
 911        IReadOnlyDictionary<string, string> propagatedMetadata,
 912        DateTimeOffset? evaluationTimestamp = null,
 913        string? promptTemplatePath = null)
 914    {
 1915        activity?.SetTag("langfuse.trace.name", "experiment-item-run");
 1916        LangfuseActivityPropagation.SetEnvironment(activity, "sdk-experiment");
 1917        LangfuseActivityPropagation.SetSessionId(activity, runName);
 1918        LangfuseActivityPropagation.SetTraceTags(activity, traceTags);
 919
 1920        LangfuseActivityPropagation.SetExperimentName(activity, runName);
 1921        LangfuseActivityPropagation.SetExperimentDescription(activity, runDescription);
 1922        LangfuseActivityPropagation.SetExperimentItemId(activity, item.SliceDatasetItemId);
 1923        LangfuseActivityPropagation.SetExperimentItemRootObservationId(activity, activity?.SpanId.ToString());
 924
 1925        foreach (var metadata in propagatedMetadata)
 926        {
 1927            LangfuseActivityPropagation.SetTraceMetadata(activity, metadata.Key, metadata.Value);
 928        }
 929
 1930        LangfuseActivityPropagation.SetTraceMetadata(activity, "experiment_name", experimentName);
 1931        LangfuseActivityPropagation.SetTraceMetadata(activity, "experiment_run_name", runName);
 1932        LangfuseActivityPropagation.SetTraceMetadata(activity, "datasetName", datasetName, propagateToObservations: fals
 1933        LangfuseActivityPropagation.SetTraceMetadata(activity, "datasetItemId", item.SliceDatasetItemId, propagateToObse
 1934        LangfuseActivityPropagation.SetTraceMetadata(activity, "dataset_item_id", item.SliceDatasetItemId, propagateToOb
 1935        LangfuseActivityPropagation.SetTraceMetadata(activity, "sourceDatasetItemId", item.SourceDatasetItemId, propagat
 1936        LangfuseActivityPropagation.SetTraceMetadata(activity, "community", runMetadata.CommunityContext, propagateToObs
 1937        LangfuseActivityPropagation.SetTraceMetadata(activity, "matchday", item.Matchday.ToString(), propagateToObservat
 1938        LangfuseActivityPropagation.SetTraceMetadata(activity, "selectedMatch", $"{item.HomeTeam} vs {item.AwayTeam}", p
 1939        LangfuseActivityPropagation.SetTraceMetadata(
 1940            activity,
 1941            "homeTeams",
 1942            PredictionTelemetryMetadata.BuildDelimitedFilterValue([item.HomeTeam]),
 1943            propagateToObservations: false);
 1944        LangfuseActivityPropagation.SetTraceMetadata(
 1945            activity,
 1946            "awayTeams",
 1947            PredictionTelemetryMetadata.BuildDelimitedFilterValue([item.AwayTeam]),
 1948            propagateToObservations: false);
 1949        LangfuseActivityPropagation.SetTraceMetadata(
 1950            activity,
 1951            "teams",
 1952            PredictionTelemetryMetadata.BuildDelimitedFilterValue([item.HomeTeam, item.AwayTeam]),
 1953            propagateToObservations: false);
 1954        if (evaluationTimestamp is not null)
 955        {
 1956            LangfuseActivityPropagation.SetTraceMetadata(
 1957                activity,
 1958                "evaluationTimestamp",
 1959                evaluationTimestamp.Value.ToString("O"),
 1960                propagateToObservations: false);
 961        }
 962
 1963        LangfuseActivityPropagation.SetTraceMetadata(activity, "tippSpielId", tippSpielId, propagateToObservations: fals
 1964        LangfuseActivityPropagation.SetTraceMetadata(activity, "promptTemplatePath", promptTemplatePath, propagateToObse
 1965    }
 966
 967    private static void SetTraceAndRootObservationInput(Activity? activity, string inputJson)
 968    {
 1969        if (activity is null || string.IsNullOrWhiteSpace(inputJson))
 970        {
 0971            return;
 972        }
 973
 1974        activity.SetTag("langfuse.trace.input", inputJson);
 1975        activity.SetTag("langfuse.observation.input", inputJson);
 1976    }
 977
 978    private static void SetExperimentItemExpectedOutput(Activity? activity, string expectedOutputJson)
 979    {
 1980        if (activity is null || string.IsNullOrWhiteSpace(expectedOutputJson))
 981        {
 0982            return;
 983        }
 984
 1985        activity.SetTag("langfuse.experiment.item.expected_output", expectedOutputJson);
 1986    }
 987
 988    private static void SetExperimentItemMetadata(Activity? activity, string metadataJson)
 989    {
 1990        if (activity is null || string.IsNullOrWhiteSpace(metadataJson))
 991        {
 0992            return;
 993        }
 994
 1995        activity.SetTag("langfuse.experiment.item.metadata", metadataJson);
 1996    }
 997
 998    private static void SetTraceAndRootObservationOutput(Activity? activity, string outputJson)
 999    {
 11000        if (activity is null || string.IsNullOrWhiteSpace(outputJson))
 1001        {
 01002            return;
 1003        }
 1004
 11005        activity.SetTag("langfuse.trace.output", outputJson);
 11006        activity.SetTag("langfuse.observation.output", outputJson);
 11007    }
 1008
 1009    private static void SetExperimentRunId(Activity? activity, string datasetRunId)
 1010    {
 11011        if (activity is null || string.IsNullOrWhiteSpace(datasetRunId))
 1012        {
 01013            return;
 1014        }
 1015
 11016        LangfuseActivityPropagation.SetExperimentRunId(activity, datasetRunId);
 11017    }
 1018
 1019    private static string CreateExperimentItemInputJson(PreparedExperimentManifestItem item)
 1020    {
 11021        return JsonSerializer.Serialize(new
 11022        {
 11023            fixture = $"{item.HomeTeam} vs {item.AwayTeam}",
 11024            item.StartsAt
 11025        }, TraceJsonOptions);
 1026    }
 1027
 1028    private static string CreateExperimentItemExpectedOutputJson(int homeGoals, int awayGoals)
 1029    {
 11030        return JsonSerializer.Serialize(new
 11031        {
 11032            score = $"{homeGoals}:{awayGoals}"
 11033        }, TraceJsonOptions);
 1034    }
 1035
 1036    private static string CreateExperimentItemMetadataJson(PreparedExperimentManifestItem item)
 1037    {
 11038        return JsonSerializer.Serialize(new
 11039        {
 11040            item.SourceDatasetItemId,
 11041            item.SliceDatasetItemId,
 11042            item.HomeTeam,
 11043            item.AwayTeam,
 11044            item.Matchday,
 11045            item.TippSpielId,
 11046            item.FixtureIndex,
 11047            item.RepetitionIndex
 11048        }, TraceJsonOptions);
 1049    }
 1050
 1051    private static IReadOnlyList<PreparedExperimentParticipantManifest> SelectParticipants(
 1052        PreparedExperimentManifest manifest,
 1053        PreparedExperimentCommunityRunRequest request)
 1054    {
 11055        var participants = manifest.Participants
 11056            .OrderBy(participant => participant.DisplayName, StringComparer.OrdinalIgnoreCase)
 11057            .ThenBy(participant => participant.ParticipantId, StringComparer.Ordinal)
 11058            .ToList();
 1059
 11060        if (request.ParticipantIds.Count > 0)
 1061        {
 01062            var filteredParticipants = participants
 01063                .Where(participant => request.ParticipantIds.Contains(participant.ParticipantId))
 01064                .ToList();
 01065            var missingParticipantIds = request.ParticipantIds
 01066                .Except(filteredParticipants.Select(participant => participant.ParticipantId), StringComparer.Ordinal)
 01067                .OrderBy(participantId => participantId, StringComparer.Ordinal)
 01068                .ToList();
 01069            if (missingParticipantIds.Count > 0)
 1070            {
 01071                throw new InvalidOperationException(
 01072                    $"The community-to-date manifest does not contain participant id(s): {string.Join(", ", missingParti
 1073            }
 1074
 01075            participants = filteredParticipants;
 1076        }
 1077
 11078        if (request.ParticipantLimit is not null)
 1079        {
 11080            participants = participants.Take(request.ParticipantLimit.Value).ToList();
 1081        }
 1082
 11083        if (participants.Count == 0)
 1084        {
 01085            throw new InvalidOperationException("No participants remain after applying the requested community-to-date f
 1086        }
 1087
 11088        return participants;
 1089    }
 1090
 1091    private static PreparedExperimentRunMetadata BuildCommunityRunMetadata(
 1092        PreparedExperimentManifest manifest,
 1093        PreparedExperimentParticipantManifest participant,
 1094        string datasetName,
 1095        string startedAtUtc,
 1096        int batchSize)
 1097    {
 11098        return new PreparedExperimentRunMetadata
 11099        {
 11100            Runner = "community-match-experiment-runner",
 11101            TaskType = "community-to-date",
 11102            CommunityContext = manifest.CommunityContext,
 11103            Competition = manifest.Competition,
 11104            SourceDatasetName = manifest.SourceDatasetName,
 11105            DatasetName = datasetName,
 11106            SliceKind = string.IsNullOrWhiteSpace(manifest.SliceKind)
 11107                ? "community-to-date"
 11108                : manifest.SliceKind,
 11109            SliceKey = manifest.SliceKey,
 11110            SourcePoolKey = manifest.SourcePoolKey,
 11111            SelectedItemIdsHash = string.IsNullOrWhiteSpace(manifest.SelectedItemIdsHash)
 11112                ? ExperimentArtifactSupport.ComputeSelectedItemIdsHash(
 11113                    manifest.SelectedItemIds.Count > 0
 11114                        ? manifest.SelectedItemIds
 01115                        : manifest.Items.Select(item => item.SliceDatasetItemId))
 11116                : manifest.SelectedItemIdsHash,
 11117            SelectedItemIdsCount = manifest.SelectedItemIds.Count > 0 ? manifest.SelectedItemIds.Count : manifest.Items.
 11118            SampleSize = manifest.SampleSize > 0 ? manifest.SampleSize : manifest.Items.Count,
 11119            StartedAtUtc = startedAtUtc,
 11120            SampleSeed = manifest.SampleSeed,
 11121            SampleMethod = string.IsNullOrWhiteSpace(manifest.SampleMethod)
 11122                ? "community-to-date"
 11123                : manifest.SampleMethod,
 11124            IncludeJustification = false,
 11125            SourceDatasetKind = "community-to-date",
 11126            DatasetItemIdMap = PreparedExperimentSupport.CreateDatasetItemIdMap(manifest),
 11127            Model = participant.DisplayName,
 11128            ObservationName = "community-match-prediction",
 11129            RunSubjectKind = "participant",
 11130            RunSubjectId = participant.ParticipantId,
 11131            RunSubjectDisplayName = participant.DisplayName,
 11132            BatchStrategy = "simple-batched",
 11133            BatchSize = batchSize,
 11134            BatchCount = null
 11135        };
 1136    }
 1137
 1138    private static string BuildCommunityRunFamilyName(PreparedExperimentManifest manifest, string startedAtUtc)
 1139    {
 01140        var communityToken = ExperimentArtifactSupport.Slugify(manifest.CommunityContext);
 01141        var sliceToken = ExperimentArtifactSupport.Slugify(string.IsNullOrWhiteSpace(manifest.SliceKey) ? "community-to-
 01142        return $"community-to-date__{communityToken}__{sliceToken}__{BuildRunTimestampToken(startedAtUtc)}";
 1143    }
 1144
 1145    private static string BuildCommunityParticipantRunName(
 1146        string runFamilyName,
 1147        PreparedExperimentParticipantManifest participant)
 1148    {
 11149        var participantToken = ExperimentArtifactSupport.Slugify($"{participant.DisplayName}-{participant.ParticipantId}
 11150        return $"{runFamilyName}__{participantToken}";
 1151    }
 1152
 1153    private static string BuildRunTimestampToken(string startedAtUtc)
 1154    {
 01155        return startedAtUtc.ToLowerInvariant().Replace(':', '-');
 1156    }
 1157
 1158    private static JsonElement CreateCommunityPredictionPayload(PreparedExperimentParticipantPrediction prediction)
 1159    {
 11160        return JsonSerializer.SerializeToElement(new
 11161        {
 11162            status = prediction.Status,
 11163            homeGoals = prediction.HomeGoals,
 11164            awayGoals = prediction.AwayGoals,
 11165            kicktippPoints = prediction.KicktippPoints
 11166        }, PreparedExperimentCommandSupport.JsonOptions);
 1167    }
 1168
 1169    private static Prediction? CreatePredictionOrNull(PreparedExperimentParticipantPrediction prediction)
 1170    {
 11171        return prediction.HomeGoals is int homeGoals && prediction.AwayGoals is int awayGoals
 11172            ? new Prediction(homeGoals, awayGoals)
 11173            : null;
 1174    }
 1175
 1176    private static void ConfigureCommunityPredictionObservation(
 1177        Activity? activity,
 1178        PreparedExperimentParticipantManifest participant,
 1179        PreparedExperimentParticipantPrediction prediction,
 1180        PreparedExperimentManifestItem item,
 1181        JsonElement predictionPayload)
 1182    {
 11183        if (activity is null)
 1184        {
 01185            return;
 1186        }
 1187
 11188        activity.SetTag("langfuse.observation.type", "generation");
 11189        activity.SetTag("gen_ai.request.model", "kicktipp-community");
 11190        activity.SetTag("langfuse.observation.input", JsonSerializer.Serialize(new
 11191        {
 11192            source = "kicktipp-community",
 11193            participantId = participant.ParticipantId,
 11194            participantDisplayName = participant.DisplayName,
 11195            item.SourceDatasetItemId,
 11196            item.TippSpielId
 11197        }, TraceJsonOptions));
 11198        activity.SetTag("langfuse.observation.output", predictionPayload.GetRawText());
 11199        new PredictionTelemetryMetadata(item.HomeTeam, item.AwayTeam).ApplyToObservation(activity);
 11200        activity.SetTag("langfuse.observation.metadata.participantId", participant.ParticipantId);
 11201        activity.SetTag("langfuse.observation.metadata.participantDisplayName", participant.DisplayName);
 11202        activity.SetTag("langfuse.observation.metadata.predictionStatus", prediction.Status);
 11203        activity.SetTag("langfuse.observation.metadata.sourceDatasetItemId", item.SourceDatasetItemId);
 1204
 11205        if (!string.IsNullOrWhiteSpace(item.TippSpielId))
 1206        {
 11207            activity.SetTag("langfuse.observation.metadata.tippSpielId", item.TippSpielId);
 1208        }
 11209    }
 1210
 1211    private static string DeriveDatasetName(PreparedExperimentRunMetadata runMetadata, PreparedExperimentManifest manife
 1212    {
 11213        return runMetadata.DatasetName
 11214               ?? manifest.SliceDatasetName
 11215               ?? throw new InvalidOperationException("No dataset name was provided for the experiment run.");
 1216    }
 1217
 1218    private static string GetCommunityContext(PreparedExperimentRunMetadata runMetadata, PreparedExperimentManifest mani
 1219    {
 11220        return !string.IsNullOrWhiteSpace(runMetadata.CommunityContext)
 11221            ? runMetadata.CommunityContext
 11222            : !string.IsNullOrWhiteSpace(manifest.CommunityContext)
 11223                ? manifest.CommunityContext
 11224                : throw new InvalidOperationException("Run metadata or manifest must contain communityContext.");
 1225    }
 1226
 1227    private static string BuildOutcomeKey(string homeTeam, string awayTeam, int matchday)
 1228    {
 11229        return string.Join("|", matchday, homeTeam.Trim(), awayTeam.Trim());
 1230    }
 1231
 1232    private static bool IsWarmupBatchTask(string taskType)
 1233    {
 11234        return string.Equals(taskType, "repeated-match", StringComparison.OrdinalIgnoreCase)
 11235            || string.Equals(taskType, "repeated-match-slice", StringComparison.OrdinalIgnoreCase);
 1236    }
 1237
 1238    private static string DescribeBatching(PreparedExperimentRunMetadata runMetadata, int batchTotal)
 1239    {
 11240        if (string.Equals(runMetadata.TaskType, "repeated-match-slice", StringComparison.OrdinalIgnoreCase))
 1241        {
 01242            return $"parallelism {runMetadata.Parallelism ?? 5}, warmup plus {Math.Max(0, runMetadata.BatchCount ?? 3)} 
 1243        }
 1244
 11245        return string.Equals(runMetadata.TaskType, "repeated-match", StringComparison.OrdinalIgnoreCase)
 11246            ? $"warmup plus {Math.Max(0, batchTotal - 1)} additional batch(es)"
 11247            : $"batch size {runMetadata.BatchSize}";
 1248    }
 1249
 11250    private sealed record PreparedExperimentExecutionResult(
 11251        string DatasetRunId,
 11252        PreparedExperimentExecutionSummary Summary);
 1253
 11254    private sealed record ExperimentPromptRoute(
 11255        IInstructionsTemplateProvider? TemplateProvider,
 11256        LangfusePromptTraceMetadata? TraceMetadata);
 1257}
 1258
 1259internal sealed record PreparedExperimentRunRequest(
 1260    string ManifestPath,
 1261    string RunName,
 1262    string? RunDescription,
 1263    string? RunMetadataFile,
 1264    bool ReplaceRun,
 1265    PreparedExperimentRunOptions Options);
 1266
 1267internal sealed record PreparedExperimentCommunityRunRequest(
 1268    string ManifestPath,
 1269    string? RunFamilyName,
 1270    string? RunDescription,
 1271    string? DatasetName,
 1272    bool ReplaceRuns,
 1273    int BatchSize,
 1274    int? ParticipantLimit,
 1275    IReadOnlySet<string> ParticipantIds);

Methods/Properties

.cctor()
.ctor(Orchestrator.Infrastructure.Factories.IFirebaseServiceFactory, Orchestrator.Infrastructure.Factories.IOpenAiServiceFactory, Orchestrator.Infrastructure.Langfuse.ILangfusePublicApiClient)
ExecuteAsync()
ExecuteCommunityToDateAsync()
BuildBatches(System.Collections.Generic.IReadOnlyList<Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifestItem>, Orchestrator.Commands.Observability.Experiments.PreparedExperimentRunMetadata, string)
CreateRepeatedMatchSliceBatches(System.Collections.Generic.IReadOnlyList<Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifestItem>, int, int)
ResolvePromptRouteAsync()
ApplyBatchingDefaults(Orchestrator.Commands.Observability.Experiments.PreparedExperimentRunMetadata, string)
ExecuteItemAsync()
ExecuteCommunityItemAsync()
PostRunScoresAsync()
PostItemScoreAsync()
WaitForDatasetRunItemsAsync()
DeleteExistingRunIfRequestedAsync()
LoadOutcomeDictionary(System.Collections.Generic.IEnumerable<EHonda.KicktippAi.Core.PersistedMatchOutcome>)
LoadOutcomesAsync()
ConfigureTraceContext(System.Diagnostics.Activity, string, string, string, string, Orchestrator.Commands.Observability.Experiments.PreparedExperimentRunMetadata, Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifestItem, string, System.Collections.Generic.IReadOnlyList<string>, System.Collections.Generic.IReadOnlyDictionary<string, string>, System.Nullable<System.DateTimeOffset>, string)
SetTraceAndRootObservationInput(System.Diagnostics.Activity, string)
SetExperimentItemExpectedOutput(System.Diagnostics.Activity, string)
SetExperimentItemMetadata(System.Diagnostics.Activity, string)
SetTraceAndRootObservationOutput(System.Diagnostics.Activity, string)
SetExperimentRunId(System.Diagnostics.Activity, string)
CreateExperimentItemInputJson(Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifestItem)
CreateExperimentItemExpectedOutputJson(int, int)
CreateExperimentItemMetadataJson(Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifestItem)
SelectParticipants(Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifest, Orchestrator.Commands.Observability.Experiments.PreparedExperimentCommunityRunRequest)
BuildCommunityRunMetadata(Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifest, Orchestrator.Commands.Observability.Experiments.PreparedExperimentParticipantManifest, string, string, int)
BuildCommunityRunFamilyName(Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifest, string)
BuildCommunityParticipantRunName(string, Orchestrator.Commands.Observability.Experiments.PreparedExperimentParticipantManifest)
BuildRunTimestampToken(string)
CreateCommunityPredictionPayload(Orchestrator.Commands.Observability.Experiments.PreparedExperimentParticipantPrediction)
CreatePredictionOrNull(Orchestrator.Commands.Observability.Experiments.PreparedExperimentParticipantPrediction)
ConfigureCommunityPredictionObservation(System.Diagnostics.Activity, Orchestrator.Commands.Observability.Experiments.PreparedExperimentParticipantManifest, Orchestrator.Commands.Observability.Experiments.PreparedExperimentParticipantPrediction, Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifestItem, System.Text.Json.JsonElement)
DeriveDatasetName(Orchestrator.Commands.Observability.Experiments.PreparedExperimentRunMetadata, Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifest)
GetCommunityContext(Orchestrator.Commands.Observability.Experiments.PreparedExperimentRunMetadata, Orchestrator.Commands.Observability.Experiments.PreparedExperimentManifest)
BuildOutcomeKey(string, string, int)
IsWarmupBatchTask(string)
DescribeBatching(Orchestrator.Commands.Observability.Experiments.PreparedExperimentRunMetadata, int)
.ctor(string, Orchestrator.Commands.Observability.Experiments.PreparedExperimentExecutionSummary)
.ctor(OpenAiIntegration.IInstructionsTemplateProvider, OpenAiIntegration.LangfusePromptTraceMetadata)