| | | 1 | | using System.Globalization; |
| | | 2 | | using System.Text.Json; |
| | | 3 | | using Microsoft.Extensions.Logging; |
| | | 4 | | using Orchestrator.Commands.Observability.Experiments; |
| | | 5 | | using Orchestrator.Infrastructure.Langfuse; |
| | | 6 | | using Spectre.Console; |
| | | 7 | | using Spectre.Console.Cli; |
| | | 8 | | |
| | | 9 | | namespace Orchestrator.Commands.Observability.ExportExperimentAnalysis; |
| | | 10 | | |
| | | 11 | | public sealed class PublishExperimentAnalysisCommand : AsyncCommand<PublishExperimentAnalysisSettings> |
| | | 12 | | { |
| | | 13 | | private readonly IAnsiConsole _console; |
| | | 14 | | private readonly ILangfusePublicApiClient _langfuseClient; |
| | | 15 | | private readonly ILogger<PublishExperimentAnalysisCommand> _logger; |
| | | 16 | | |
| | 1 | 17 | | public PublishExperimentAnalysisCommand( |
| | 1 | 18 | | IAnsiConsole console, |
| | 1 | 19 | | ILangfusePublicApiClient langfuseClient, |
| | 1 | 20 | | ILogger<PublishExperimentAnalysisCommand> logger) |
| | | 21 | | { |
| | 1 | 22 | | _console = console; |
| | 1 | 23 | | _langfuseClient = langfuseClient; |
| | 1 | 24 | | _logger = logger; |
| | 1 | 25 | | } |
| | | 26 | | |
| | | 27 | | protected override async Task<int> ExecuteAsync(CommandContext context, PublishExperimentAnalysisSettings settings, |
| | | 28 | | { |
| | | 29 | | try |
| | | 30 | | { |
| | 1 | 31 | | var bundle = await PreparedExperimentCommandSupport.LoadJsonFileAsync<PreparedExperimentAnalysisBundle>( |
| | 1 | 32 | | settings.InputPath, |
| | 1 | 33 | | cancellationToken); |
| | 1 | 34 | | var rowsByRunName = bundle.Rows |
| | 1 | 35 | | .GroupBy(row => row.RunName, StringComparer.Ordinal) |
| | 1 | 36 | | .ToDictionary(group => group.Key, group => group.ToList(), StringComparer.Ordinal); |
| | 1 | 37 | | var experimentName = string.IsNullOrWhiteSpace(settings.ExperimentName) |
| | 1 | 38 | | ? DeriveExperimentName(bundle) |
| | 1 | 39 | | : settings.ExperimentName.Trim(); |
| | 1 | 40 | | var runResults = new List<object>(); |
| | | 41 | | |
| | 1 | 42 | | PreparedExperimentSupport.ReportProgress( |
| | 1 | 43 | | $"Publishing {bundle.Runs.Count} experiment run alias(es) from '{settings.InputPath}' to Langfuse Experi |
| | | 44 | | |
| | 1 | 45 | | foreach (var run in bundle.Runs.OrderBy(run => run.RunName, StringComparer.Ordinal)) |
| | | 46 | | { |
| | 1 | 47 | | if (!rowsByRunName.TryGetValue(run.RunName, out var rows) || rows.Count == 0) |
| | | 48 | | { |
| | 0 | 49 | | throw new InvalidOperationException($"Analysis bundle run '{run.RunName}' has no rows to publish."); |
| | | 50 | | } |
| | | 51 | | |
| | 1 | 52 | | var targetRunName = run.RunName + settings.RunNameSuffix; |
| | 1 | 53 | | var runDescription = string.IsNullOrWhiteSpace(settings.Description) |
| | 1 | 54 | | ? $"Published from analysis bundle '{Path.GetFileName(settings.InputPath)}' for Langfuse Experiments |
| | 1 | 55 | | : settings.Description.Trim(); |
| | | 56 | | |
| | 1 | 57 | | if (settings.DryRun) |
| | | 58 | | { |
| | 0 | 59 | | runResults.Add(new |
| | 0 | 60 | | { |
| | 0 | 61 | | sourceRunName = run.RunName, |
| | 0 | 62 | | targetRunName, |
| | 0 | 63 | | rowCount = rows.Count, |
| | 0 | 64 | | experimentName |
| | 0 | 65 | | }); |
| | 0 | 66 | | continue; |
| | | 67 | | } |
| | | 68 | | |
| | 1 | 69 | | var existingRun = await _langfuseClient.GetDatasetRunAsync( |
| | 1 | 70 | | bundle.DatasetName, |
| | 1 | 71 | | targetRunName, |
| | 1 | 72 | | cancellationToken); |
| | | 73 | | |
| | 1 | 74 | | if (existingRun is not null) |
| | | 75 | | { |
| | 0 | 76 | | if (!settings.ReplaceRuns) |
| | | 77 | | { |
| | 0 | 78 | | throw new InvalidOperationException( |
| | 0 | 79 | | $"Published run alias '{targetRunName}' already exists in dataset '{bundle.DatasetName}'. Us |
| | | 80 | | } |
| | | 81 | | |
| | 0 | 82 | | await _langfuseClient.DeleteDatasetRunAsync(bundle.DatasetName, targetRunName, cancellationToken); |
| | | 83 | | } |
| | | 84 | | |
| | 1 | 85 | | var publishedAtUtc = ExperimentArtifactSupport.FormatStartedAtUtc(DateTimeOffset.UtcNow); |
| | 1 | 86 | | var runMetadata = BuildRunMetadata(bundle, run, rows); |
| | 1 | 87 | | var metadata = PreparedExperimentSupport.BuildLangfuseExperimentMetadata( |
| | 1 | 88 | | runMetadata, |
| | 1 | 89 | | experimentName, |
| | 1 | 90 | | targetRunName, |
| | 1 | 91 | | new Dictionary<string, string?> |
| | 1 | 92 | | { |
| | 1 | 93 | | ["sourceRunName"] = run.RunName, |
| | 1 | 94 | | ["sourceDatasetRunId"] = run.DatasetRunId, |
| | 1 | 95 | | ["publishedFromAnalysisBundle"] = Path.GetFileName(settings.InputPath), |
| | 1 | 96 | | ["publishedAtUtc"] = publishedAtUtc |
| | 1 | 97 | | }); |
| | 1 | 98 | | var createdAt = ParseTimestampOrNull(run.StartedAtUtc); |
| | 1 | 99 | | string? targetDatasetRunId = null; |
| | | 100 | | |
| | 1 | 101 | | PreparedExperimentSupport.ReportProgress( |
| | 1 | 102 | | $"Publishing alias '{targetRunName}' with {rows.Count} item(s)."); |
| | | 103 | | |
| | 1 | 104 | | foreach (var row in rows.OrderBy(row => row.DatasetItemId, StringComparer.Ordinal)) |
| | | 105 | | { |
| | 1 | 106 | | var datasetRunItem = await _langfuseClient.CreateDatasetRunItemAsync( |
| | 1 | 107 | | new LangfuseCreateDatasetRunItemRequest( |
| | 1 | 108 | | targetRunName, |
| | 1 | 109 | | row.DatasetItemId, |
| | 1 | 110 | | row.TraceId, |
| | 1 | 111 | | runDescription, |
| | 1 | 112 | | metadata, |
| | 1 | 113 | | row.ObservationId, |
| | 1 | 114 | | createdAt), |
| | 1 | 115 | | cancellationToken); |
| | | 116 | | |
| | 1 | 117 | | targetDatasetRunId ??= datasetRunItem.DatasetRunId; |
| | | 118 | | } |
| | | 119 | | |
| | 1 | 120 | | if (string.IsNullOrWhiteSpace(targetDatasetRunId)) |
| | | 121 | | { |
| | 0 | 122 | | throw new InvalidOperationException($"Publishing run alias '{targetRunName}' did not return a datase |
| | | 123 | | } |
| | | 124 | | |
| | 1 | 125 | | await PostRunScoresAsync(targetDatasetRunId, run, metadata, cancellationToken); |
| | | 126 | | |
| | 1 | 127 | | runResults.Add(new |
| | 1 | 128 | | { |
| | 1 | 129 | | sourceRunName = run.RunName, |
| | 1 | 130 | | targetRunName, |
| | 1 | 131 | | datasetRunId = targetDatasetRunId, |
| | 1 | 132 | | rowCount = rows.Count, |
| | 1 | 133 | | experimentName |
| | 1 | 134 | | }); |
| | 1 | 135 | | } |
| | | 136 | | |
| | 1 | 137 | | var summary = new |
| | 1 | 138 | | { |
| | 1 | 139 | | bundle.DatasetName, |
| | 1 | 140 | | bundle.TaskType, |
| | 1 | 141 | | experimentName, |
| | 1 | 142 | | dryRun = settings.DryRun, |
| | 1 | 143 | | runCount = runResults.Count, |
| | 1 | 144 | | runs = runResults |
| | 1 | 145 | | }; |
| | | 146 | | |
| | 1 | 147 | | _console.WriteLine(JsonSerializer.Serialize(summary, PreparedExperimentCommandSupport.JsonOptions)); |
| | 1 | 148 | | return 0; |
| | | 149 | | } |
| | 0 | 150 | | catch (Exception ex) |
| | | 151 | | { |
| | 0 | 152 | | _logger.LogError(ex, "Error publishing experiment analysis bundle"); |
| | 0 | 153 | | _console.MarkupLine($"[red]Error:[/] {Markup.Escape(ex.Message)}"); |
| | 0 | 154 | | return 1; |
| | | 155 | | } |
| | 1 | 156 | | } |
| | | 157 | | |
| | | 158 | | private async Task PostRunScoresAsync( |
| | | 159 | | string datasetRunId, |
| | | 160 | | PreparedExperimentAnalysisRun run, |
| | | 161 | | JsonElement metadata, |
| | | 162 | | CancellationToken cancellationToken) |
| | | 163 | | { |
| | 1 | 164 | | await _langfuseClient.CreateScoreAsync( |
| | 1 | 165 | | new LangfuseCreateScoreRequest( |
| | 1 | 166 | | "total_kicktipp_points", |
| | 1 | 167 | | run.AggregateScores.TotalKicktippPoints, |
| | 1 | 168 | | DatasetRunId: datasetRunId, |
| | 1 | 169 | | Comment: $"Published aggregate score for {run.RowCount} item(s)", |
| | 1 | 170 | | Id: PreparedExperimentSupport.CreateScoreId("total_kicktipp_points", datasetRunId), |
| | 1 | 171 | | Metadata: metadata), |
| | 1 | 172 | | cancellationToken); |
| | | 173 | | |
| | 1 | 174 | | await _langfuseClient.CreateScoreAsync( |
| | 1 | 175 | | new LangfuseCreateScoreRequest( |
| | 1 | 176 | | "avg_kicktipp_points", |
| | 1 | 177 | | run.AggregateScores.AvgKicktippPoints, |
| | 1 | 178 | | DatasetRunId: datasetRunId, |
| | 1 | 179 | | Comment: $"Published aggregate score for {run.RowCount} item(s)", |
| | 1 | 180 | | Id: PreparedExperimentSupport.CreateScoreId("avg_kicktipp_points", datasetRunId), |
| | 1 | 181 | | Metadata: metadata), |
| | 1 | 182 | | cancellationToken); |
| | 1 | 183 | | } |
| | | 184 | | |
| | | 185 | | private static PreparedExperimentRunMetadata BuildRunMetadata( |
| | | 186 | | PreparedExperimentAnalysisBundle bundle, |
| | | 187 | | PreparedExperimentAnalysisRun run, |
| | | 188 | | IReadOnlyList<PreparedExperimentAnalysisRow> rows) |
| | | 189 | | { |
| | 1 | 190 | | var datasetItemIdMap = rows |
| | 1 | 191 | | .GroupBy(row => row.SourceDatasetItemId, StringComparer.Ordinal) |
| | 1 | 192 | | .Where(group => group.Select(row => row.DatasetItemId).Distinct(StringComparer.Ordinal).Count() == 1) |
| | 1 | 193 | | .ToDictionary(group => group.Key, group => group.First().DatasetItemId, StringComparer.Ordinal); |
| | | 194 | | |
| | 1 | 195 | | return new PreparedExperimentRunMetadata |
| | 1 | 196 | | { |
| | 1 | 197 | | Runner = "experiment-analysis-publisher", |
| | 1 | 198 | | TaskType = run.TaskType, |
| | 1 | 199 | | CommunityContext = TryResolveCommunityContext(bundle.DatasetName), |
| | 1 | 200 | | Competition = TryResolveCompetition(bundle.DatasetName), |
| | 1 | 201 | | DatasetName = bundle.DatasetName, |
| | 1 | 202 | | PromptKey = run.PromptKey, |
| | 1 | 203 | | ReasoningEffort = run.ReasoningEffort, |
| | 1 | 204 | | SliceKind = run.SliceKind, |
| | 1 | 205 | | SliceKey = run.SliceKey, |
| | 1 | 206 | | SourcePoolKey = run.SourcePoolKey, |
| | 1 | 207 | | SelectedItemIdsHash = run.SelectedItemIdsHash, |
| | 1 | 208 | | SelectedItemIdsCount = run.SelectedItemIdsCount, |
| | 1 | 209 | | SampleSize = run.SampleSize, |
| | 1 | 210 | | EvaluationTimestampPolicyKey = run.EvaluationTimestampPolicyKey, |
| | 1 | 211 | | EvaluationTime = run.EvaluationTime, |
| | 1 | 212 | | StartedAtUtc = run.StartedAtUtc, |
| | 1 | 213 | | IncludeJustification = false, |
| | 1 | 214 | | PromptVersion = run.PromptKey, |
| | 1 | 215 | | SourceDatasetKind = run.TaskType, |
| | 1 | 216 | | DatasetItemIdMap = datasetItemIdMap, |
| | 1 | 217 | | Model = run.Model, |
| | 1 | 218 | | RunSubjectKind = run.RunSubjectKind, |
| | 1 | 219 | | RunSubjectId = run.RunSubjectId, |
| | 1 | 220 | | RunSubjectDisplayName = run.RunSubjectDisplayName, |
| | 1 | 221 | | BatchStrategy = "published-analysis" |
| | 1 | 222 | | }; |
| | | 223 | | } |
| | | 224 | | |
| | | 225 | | private static string DeriveExperimentName(PreparedExperimentAnalysisBundle bundle) |
| | | 226 | | { |
| | 1 | 227 | | var datasetSegments = bundle.DatasetName |
| | 1 | 228 | | .Split('/', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); |
| | 1 | 229 | | var community = datasetSegments.Length >= 3 |
| | 1 | 230 | | ? datasetSegments[2] |
| | 1 | 231 | | : "unknown-community"; |
| | 1 | 232 | | var datasetTail = datasetSegments.Length > 0 |
| | 1 | 233 | | ? datasetSegments[^1] |
| | 1 | 234 | | : "analysis"; |
| | | 235 | | |
| | 1 | 236 | | return string.Join( |
| | 1 | 237 | | "__", |
| | 1 | 238 | | new[] |
| | 1 | 239 | | { |
| | 1 | 240 | | bundle.TaskType, |
| | 1 | 241 | | community, |
| | 1 | 242 | | datasetTail |
| | 1 | 243 | | }.Select(ExperimentArtifactSupport.Slugify)); |
| | | 244 | | } |
| | | 245 | | |
| | | 246 | | private static string? TryResolveCompetition(string datasetName) |
| | | 247 | | { |
| | 1 | 248 | | var datasetSegments = datasetName.Split('/', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntr |
| | 1 | 249 | | return datasetSegments.Length >= 2 && string.Equals(datasetSegments[0], "match-predictions", StringComparison.Or |
| | 1 | 250 | | ? datasetSegments[1] |
| | 1 | 251 | | : null; |
| | | 252 | | } |
| | | 253 | | |
| | | 254 | | private static string? TryResolveCommunityContext(string datasetName) |
| | | 255 | | { |
| | 1 | 256 | | var datasetSegments = datasetName.Split('/', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntr |
| | 1 | 257 | | return datasetSegments.Length >= 3 && string.Equals(datasetSegments[0], "match-predictions", StringComparison.Or |
| | 1 | 258 | | ? datasetSegments[2] |
| | 1 | 259 | | : null; |
| | | 260 | | } |
| | | 261 | | |
| | | 262 | | private static DateTimeOffset? ParseTimestampOrNull(string? value) |
| | | 263 | | { |
| | 1 | 264 | | return DateTimeOffset.TryParse( |
| | 1 | 265 | | value, |
| | 1 | 266 | | CultureInfo.InvariantCulture, |
| | 1 | 267 | | DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal, |
| | 1 | 268 | | out var parsed) |
| | 1 | 269 | | ? parsed |
| | 1 | 270 | | : null; |
| | | 271 | | } |
| | | 272 | | } |