< Summary

Information
Class: Orchestrator.Commands.Observability.SyncDataset.SyncDatasetCommand.SyncDatasetArtifact
Assembly: Orchestrator
File(s): /home/runner/work/KicktippAi/KicktippAi/src/Orchestrator/Commands/Observability/SyncDataset/SyncDatasetCommand.cs
Line coverage
100%
Covered lines: 7
Uncovered lines: 0
Coverable lines: 7
Total lines: 560
Line coverage: 100%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%

File(s)

/home/runner/work/KicktippAi/KicktippAi/src/Orchestrator/Commands/Observability/SyncDataset/SyncDatasetCommand.cs

#LineLine coverage
 1using System.Text.Json;
 2using System.Text.Json.Serialization;
 3using Microsoft.Extensions.Logging;
 4using Orchestrator.Commands.Observability.Experiments;
 5using Orchestrator.Infrastructure.Langfuse;
 6using Spectre.Console;
 7using Spectre.Console.Cli;
 8
 9namespace Orchestrator.Commands.Observability.SyncDataset;
 10
 11public sealed class SyncDatasetCommand : AsyncCommand<SyncDatasetSettings>
 12{
 13    private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
 14    {
 15        PropertyNameCaseInsensitive = true,
 16        PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
 17        WriteIndented = true,
 18        DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
 19    };
 20
 21    private static readonly JsonElement DefaultInputSchema = ParseJsonElement(
 22        """
 23        {
 24          "type": "object",
 25          "properties": {
 26            "homeTeam": {
 27              "type": "string",
 28              "minLength": 1,
 29              "description": "Exact home team name from the persisted match outcome"
 30            },
 31            "awayTeam": {
 32              "type": "string",
 33              "minLength": 1,
 34              "description": "Exact away team name from the persisted match outcome"
 35            },
 36            "startsAt": {
 37              "type": "string",
 38              "minLength": 1,
 39              "description": "Localized match start timestamp string emitted by the .NET exporter"
 40            }
 41          },
 42          "required": ["homeTeam", "awayTeam", "startsAt"],
 43          "additionalProperties": false
 44        }
 45        """);
 46
 47    private static readonly JsonElement DefaultExpectedOutputSchema = ParseJsonElement(
 48        """
 49        {
 50          "type": "object",
 51          "properties": {
 52            "homeGoals": {
 53              "type": "integer",
 54              "minimum": 0,
 55              "description": "Actual home goals scored in the completed match"
 56            },
 57            "awayGoals": {
 58              "type": "integer",
 59              "minimum": 0,
 60              "description": "Actual away goals scored in the completed match"
 61            }
 62          },
 63          "required": ["homeGoals", "awayGoals"],
 64          "additionalProperties": false
 65        }
 66        """);
 67
 68    private static readonly JsonElement MetadataSchema = ParseJsonElement(
 69        """
 70        {
 71          "type": "object",
 72          "properties": {
 73            "competition": {
 74              "type": "string",
 75              "minLength": 1,
 76              "description": "Competition identifier"
 77            },
 78            "season": {
 79              "type": "string",
 80              "minLength": 1,
 81              "description": "Season label"
 82            },
 83            "communityContext": {
 84              "type": "string",
 85              "minLength": 1,
 86              "description": "Community context slug"
 87            },
 88            "matchday": {
 89              "type": "integer",
 90              "minimum": 1,
 91              "description": "Bundesliga matchday number"
 92            },
 93            "matchdayLabel": {
 94              "type": "string",
 95              "minLength": 1,
 96              "description": "Human-readable matchday label"
 97            },
 98            "homeTeam": {
 99              "type": "string",
 100              "minLength": 1,
 101              "description": "Exact home team name"
 102            },
 103            "awayTeam": {
 104              "type": "string",
 105              "minLength": 1,
 106              "description": "Exact away team name"
 107            },
 108            "tippSpielId": {
 109              "type": "string",
 110              "minLength": 1,
 111              "description": "Kicktipp match identifier"
 112            },
 113            "fixtureIndex": {
 114              "type": "integer",
 115              "minimum": 1,
 116              "description": "One-based fixture index for repeated-match-slice datasets"
 117            },
 118            "repetitionIndex": {
 119              "type": "integer",
 120              "minimum": 1,
 121              "description": "One-based repetition index for repeated-match-slice datasets"
 122            }
 123          },
 124          "required": [
 125            "competition",
 126            "season",
 127            "communityContext",
 128            "matchday",
 129            "matchdayLabel",
 130            "homeTeam",
 131            "awayTeam",
 132            "tippSpielId"
 133          ],
 134          "additionalProperties": false
 135        }
 136        """);
 137
 138    private readonly IAnsiConsole _console;
 139    private readonly ILangfusePublicApiClient _langfuseClient;
 140    private readonly ILogger<SyncDatasetCommand> _logger;
 141
 142    public SyncDatasetCommand(
 143        IAnsiConsole console,
 144        ILangfusePublicApiClient langfuseClient,
 145        ILogger<SyncDatasetCommand> logger)
 146    {
 147        _console = console;
 148        _langfuseClient = langfuseClient;
 149        _logger = logger;
 150    }
 151
 152    protected override async Task<int> ExecuteAsync(CommandContext context, SyncDatasetSettings settings, CancellationTo
 153    {
 154        try
 155        {
 156            var artifact = await LoadArtifactAsync(settings.InputPath, cancellationToken);
 157            var datasetName = settings.DatasetName ?? artifact.DatasetName;
 158            if (string.IsNullOrWhiteSpace(datasetName))
 159            {
 160                throw new InvalidOperationException("Dataset name missing from artifact and no --dataset-name override w
 161            }
 162
 163            ValidateArtifact(artifact, datasetName);
 164
 165            PreparedExperimentSupport.ReportProgress(
 166                $"Starting dataset sync for '{datasetName}' with {artifact.Items.Count} item(s). Dry run: {settings.DryR
 167
 168            var datasetDefinition = BuildDatasetDefinition(artifact, datasetName);
 169            var dataset = settings.DryRun
 170                ? CreateDryRunDataset(datasetDefinition)
 171                : await CreateOrUpdateDatasetAsync(datasetDefinition, cancellationToken);
 172
 173            PreparedExperimentSupport.ReportProgress(
 174                settings.DryRun
 175                    ? $"Validated dataset definition for '{datasetName}' in dry-run mode."
 176                    : $"Dataset '{datasetName}' is ready with id '{dataset.Id}'.");
 177
 178            var created = 0;
 179            var updated = 0;
 180            var unchanged = 0;
 181            var failures = new List<object>();
 182
 183            for (var itemIndex = 0; itemIndex < artifact.Items.Count; itemIndex += 1)
 184            {
 185                var item = artifact.Items[itemIndex];
 186
 187                if (settings.DryRun)
 188                {
 189                    created += 1;
 190                    continue;
 191                }
 192
 193                try
 194                {
 195                    PreparedExperimentSupport.ReportProgress(
 196                        $"Syncing dataset item {itemIndex + 1}/{artifact.Items.Count}: {item.Id}.");
 197                    var disposition = await SyncDatasetItemAsync(datasetName, item, cancellationToken);
 198                    switch (disposition)
 199                    {
 200                        case SyncDisposition.Created:
 201                            created += 1;
 202                            break;
 203                        case SyncDisposition.Updated:
 204                            updated += 1;
 205                            break;
 206                        default:
 207                            unchanged += 1;
 208                            break;
 209                    }
 210                }
 211                catch (Exception ex)
 212                {
 213                    PreparedExperimentSupport.ReportProgress(
 214                        $"Dataset item {itemIndex + 1}/{artifact.Items.Count} failed: {item.Id}. {ex.Message}");
 215                    failures.Add(new
 216                    {
 217                        itemId = item.Id,
 218                        message = ex.Message
 219                    });
 220                }
 221            }
 222
 223            if (failures.Count > 0)
 224            {
 225                throw new InvalidOperationException($"Dataset sync failed for {failures.Count} item(s): {JsonSerializer.
 226            }
 227
 228            PreparedExperimentSupport.ReportProgress(
 229                $"Completed dataset sync for '{datasetName}': created={created}, updated={updated}, unchanged={unchanged
 230
 231            var summary = new
 232            {
 233                datasetName,
 234                datasetId = dataset.Id,
 235                datasetInputSchemaKeys = GetSchemaKeys(dataset.InputSchema),
 236                datasetExpectedOutputSchemaKeys = GetSchemaKeys(dataset.ExpectedOutputSchema),
 237                dryRun = settings.DryRun,
 238                itemCount = artifact.Items.Count,
 239                created,
 240                updated,
 241                unchanged,
 242                firstItemId = artifact.Items.FirstOrDefault()?.Id,
 243                lastItemId = artifact.Items.LastOrDefault()?.Id
 244            };
 245
 246            _console.WriteLine(JsonSerializer.Serialize(summary, SerializerOptions));
 247            return 0;
 248        }
 249        catch (Exception ex)
 250        {
 251            _logger.LogError(ex, "Error executing sync-dataset command");
 252            _console.MarkupLine($"[red]Error:[/] {Markup.Escape(ex.Message)}");
 253            return 1;
 254        }
 255    }
 256
 257    private async Task<SyncDisposition> SyncDatasetItemAsync(
 258        string datasetName,
 259        SyncDatasetArtifactItem item,
 260        CancellationToken cancellationToken)
 261    {
 262        var existingItem = await _langfuseClient.GetDatasetItemAsync(item.Id, cancellationToken);
 263
 264        if (existingItem is not null && HasSameCanonicalContent(existingItem, item, datasetName))
 265        {
 266            return SyncDisposition.Unchanged;
 267        }
 268
 269        await _langfuseClient.CreateDatasetItemAsync(
 270            new LangfuseCreateDatasetItemRequest(
 271                item.Id,
 272                datasetName,
 273                item.Input,
 274                item.ExpectedOutput,
 275                item.Metadata),
 276            cancellationToken);
 277
 278        return existingItem is null ? SyncDisposition.Created : SyncDisposition.Updated;
 279    }
 280
 281    private static bool HasSameCanonicalContent(LangfuseDatasetItem existingItem, SyncDatasetArtifactItem item, string d
 282    {
 283        if (!string.IsNullOrWhiteSpace(existingItem.DatasetName)
 284            && !string.Equals(existingItem.DatasetName, datasetName, StringComparison.Ordinal))
 285        {
 286            throw new InvalidOperationException(
 287                $"Dataset item '{item.Id}' already exists in dataset '{existingItem.DatasetName}', not '{datasetName}'."
 288        }
 289
 290        return LangfuseJsonUtilities.StableEquals(existingItem.Input, item.Input)
 291            && LangfuseJsonUtilities.StableEquals(existingItem.ExpectedOutput, item.ExpectedOutput)
 292            && LangfuseJsonUtilities.StableEquals(existingItem.Metadata, item.Metadata);
 293    }
 294
 295    private async Task<LangfuseDataset> CreateOrUpdateDatasetAsync(
 296        LangfuseCreateDatasetRequest datasetDefinition,
 297        CancellationToken cancellationToken)
 298    {
 299        var createdDataset = await _langfuseClient.CreateDatasetAsync(datasetDefinition, cancellationToken);
 300        return createdDataset with
 301        {
 302            InputSchema = LangfuseJsonUtilities.IsDefined(createdDataset.InputSchema)
 303                ? createdDataset.InputSchema
 304                : datasetDefinition.InputSchema ?? default,
 305            ExpectedOutputSchema = LangfuseJsonUtilities.IsDefined(createdDataset.ExpectedOutputSchema)
 306                ? createdDataset.ExpectedOutputSchema
 307                : datasetDefinition.ExpectedOutputSchema ?? default
 308        };
 309    }
 310
 311    private static LangfuseDataset CreateDryRunDataset(LangfuseCreateDatasetRequest datasetDefinition)
 312    {
 313        return new LangfuseDataset(
 314            string.Empty,
 315            datasetDefinition.Name,
 316            datasetDefinition.Description,
 317            datasetDefinition.Metadata is JsonElement metadata ? metadata : default,
 318            datasetDefinition.InputSchema ?? default,
 319            datasetDefinition.ExpectedOutputSchema ?? default);
 320    }
 321
 322    private static LangfuseCreateDatasetRequest BuildDatasetDefinition(SyncDatasetArtifact artifact, string datasetName)
 323    {
 324        var firstItemMetadata = artifact.Items.FirstOrDefault()?.Metadata ?? default;
 325        var competition = GetStringProperty(firstItemMetadata, "competition") ?? "bundesliga-2025-26";
 326        var season = GetStringProperty(firstItemMetadata, "season") ?? "2025/2026";
 327        var communityContext = GetStringProperty(firstItemMetadata, "communityContext")
 328            ?? datasetName.Split('/', StringSplitOptions.RemoveEmptyEntries).Last();
 329
 330        var datasetMetadata = LangfuseJsonUtilities.IsDefined(artifact.DatasetMetadata)
 331            ? artifact.DatasetMetadata
 332            : JsonSerializer.SerializeToElement(new
 333            {
 334                competition,
 335                communityContext,
 336                scope = "match-centric",
 337                season
 338            }, SerializerOptions);
 339
 340        return new LangfuseCreateDatasetRequest(
 341            datasetName,
 342            artifact.DatasetDescription
 343            ?? $"Hosted dataset for {season} {communityContext} {competition} match experiments",
 344            datasetMetadata,
 345            LangfuseJsonUtilities.IsDefined(artifact.InputSchema) ? artifact.InputSchema : DefaultInputSchema,
 346            LangfuseJsonUtilities.IsDefined(artifact.ExpectedOutputSchema)
 347                ? artifact.ExpectedOutputSchema
 348                : DefaultExpectedOutputSchema);
 349    }
 350
 351    private static void ValidateArtifact(SyncDatasetArtifact artifact, string datasetName)
 352    {
 353        Assert(!string.IsNullOrWhiteSpace(datasetName), "Dataset name must be a non-empty string.");
 354
 355        var inputSchema = LangfuseJsonUtilities.IsDefined(artifact.InputSchema)
 356            ? artifact.InputSchema
 357            : DefaultInputSchema;
 358        var expectedOutputSchema = LangfuseJsonUtilities.IsDefined(artifact.ExpectedOutputSchema)
 359            ? artifact.ExpectedOutputSchema
 360            : DefaultExpectedOutputSchema;
 361
 362        var seenItemIds = new HashSet<string>(StringComparer.Ordinal);
 363        foreach (var item in artifact.Items)
 364        {
 365            Assert(!string.IsNullOrWhiteSpace(item.Id), "Each dataset item must have a non-empty string id.");
 366            Assert(seenItemIds.Add(item.Id), $"Duplicate dataset item id '{item.Id}' found in artifact.");
 367
 368            ValidateValueAgainstSchema(item.Input, inputSchema, $"Dataset item '{item.Id}' input");
 369            ValidateValueAgainstSchema(item.ExpectedOutput, expectedOutputSchema, $"Dataset item '{item.Id}' expectedOut
 370            ValidateValueAgainstSchema(item.Metadata, MetadataSchema, $"Dataset item '{item.Id}' metadata");
 371        }
 372    }
 373
 374    private static void ValidateValueAgainstSchema(JsonElement value, JsonElement schema, string label)
 375    {
 376        Assert(schema.ValueKind == JsonValueKind.Object, $"{label} schema must be an object.");
 377
 378        var schemaType = schema.GetProperty("type").GetString();
 379        switch (schemaType)
 380        {
 381            case "object":
 382                Assert(value.ValueKind == JsonValueKind.Object, $"{label} must be an object.");
 383
 384                var properties = schema.TryGetProperty("properties", out var propertyNode)
 385                    ? propertyNode
 386                    : default;
 387                var required = schema.TryGetProperty("required", out var requiredNode)
 388                    ? requiredNode.EnumerateArray().Select(entry => entry.GetString()!).ToHashSet(StringComparer.Ordinal
 389                    : [];
 390                var actualProperties = value.EnumerateObject().ToList();
 391                var allowedKeys = properties.ValueKind == JsonValueKind.Object
 392                    ? properties.EnumerateObject().Select(property => property.Name).ToHashSet(StringComparer.Ordinal)
 393                    : [];
 394
 395                if (schema.TryGetProperty("additionalProperties", out var additionalProperties)
 396                    && additionalProperties.ValueKind == JsonValueKind.False)
 397                {
 398                    var unexpectedKeys = actualProperties
 399                        .Select(property => property.Name)
 400                        .Where(name => !allowedKeys.Contains(name))
 401                        .ToArray();
 402                    Assert(
 403                        unexpectedKeys.Length == 0,
 404                        $"{label} must not contain unexpected keys: {string.Join(", ", unexpectedKeys)}.");
 405                }
 406
 407                foreach (var requiredKey in required)
 408                {
 409                    Assert(value.TryGetProperty(requiredKey, out _), $"{label}.{requiredKey} is required.");
 410                }
 411
 412                if (properties.ValueKind == JsonValueKind.Object)
 413                {
 414                    foreach (var actualProperty in actualProperties)
 415                    {
 416                        if (properties.TryGetProperty(actualProperty.Name, out var propertySchema))
 417                        {
 418                            ValidateValueAgainstSchema(actualProperty.Value, propertySchema, $"{label}.{actualProperty.N
 419                        }
 420                    }
 421                }
 422
 423                return;
 424
 425            case "string":
 426                Assert(value.ValueKind == JsonValueKind.String, $"{label} must be a non-empty string.");
 427                var stringValue = value.GetString() ?? string.Empty;
 428                Assert(!string.IsNullOrWhiteSpace(stringValue), $"{label} must be a non-empty string.");
 429
 430                if (schema.TryGetProperty("minLength", out var minLengthNode))
 431                {
 432                    Assert(stringValue.Length >= minLengthNode.GetInt32(), $"{label} must be at least {minLengthNode.Get
 433                }
 434
 435                if (schema.TryGetProperty("maxLength", out var maxLengthNode))
 436                {
 437                    Assert(stringValue.Length <= maxLengthNode.GetInt32(), $"{label} must be at most {maxLengthNode.GetI
 438                }
 439
 440                return;
 441
 442            case "integer":
 443                Assert(value.ValueKind == JsonValueKind.Number, $"{label} must be an integer.");
 444                if (!value.TryGetInt64(out var integerValue))
 445                {
 446                    throw new InvalidOperationException($"{label} must be an integer.");
 447                }
 448
 449                if (schema.TryGetProperty("minimum", out var minimumIntegerNode))
 450                {
 451                    Assert(integerValue >= minimumIntegerNode.GetInt64(), $"{label} must be at least {minimumIntegerNode
 452                }
 453
 454                if (schema.TryGetProperty("maximum", out var maximumIntegerNode))
 455                {
 456                    Assert(integerValue <= maximumIntegerNode.GetInt64(), $"{label} must be at most {maximumIntegerNode.
 457                }
 458
 459                return;
 460
 461            case "number":
 462                Assert(value.ValueKind == JsonValueKind.Number, $"{label} must be a number.");
 463                if (!value.TryGetDouble(out var numberValue))
 464                {
 465                    throw new InvalidOperationException($"{label} must be a number.");
 466                }
 467
 468                if (schema.TryGetProperty("minimum", out var minimumNumberNode))
 469                {
 470                    Assert(numberValue >= minimumNumberNode.GetDouble(), $"{label} must be at least {minimumNumberNode.G
 471                }
 472
 473                if (schema.TryGetProperty("maximum", out var maximumNumberNode))
 474                {
 475                    Assert(numberValue <= maximumNumberNode.GetDouble(), $"{label} must be at most {maximumNumberNode.Ge
 476                }
 477
 478                return;
 479
 480            case "boolean":
 481                Assert(value.ValueKind is JsonValueKind.True or JsonValueKind.False, $"{label} must be a boolean.");
 482                return;
 483
 484            default:
 485                throw new InvalidOperationException($"Unsupported schema type '{schemaType}' for {label}.");
 486        }
 487    }
 488
 489    private static IReadOnlyList<string> GetSchemaKeys(JsonElement schema)
 490    {
 491        if (!LangfuseJsonUtilities.IsDefined(schema)
 492            || schema.ValueKind != JsonValueKind.Object
 493            || !schema.TryGetProperty("properties", out var properties)
 494            || properties.ValueKind != JsonValueKind.Object)
 495        {
 496            return [];
 497        }
 498
 499        return properties.EnumerateObject()
 500            .Select(property => property.Name)
 501            .OrderBy(name => name, StringComparer.Ordinal)
 502            .ToList();
 503    }
 504
 505    private static string? GetStringProperty(JsonElement element, string propertyName)
 506    {
 507        if (!LangfuseJsonUtilities.IsDefined(element)
 508            || element.ValueKind != JsonValueKind.Object
 509            || !element.TryGetProperty(propertyName, out var property)
 510            || property.ValueKind != JsonValueKind.String)
 511        {
 512            return null;
 513        }
 514
 515        return property.GetString();
 516    }
 517
 518    private static async Task<SyncDatasetArtifact> LoadArtifactAsync(string inputPath, CancellationToken cancellationTok
 519    {
 520        var absolutePath = Path.GetFullPath(inputPath);
 521        var raw = await File.ReadAllTextAsync(absolutePath, cancellationToken);
 522        var artifact = JsonSerializer.Deserialize<SyncDatasetArtifact>(raw, SerializerOptions);
 523        return artifact ?? throw new InvalidOperationException($"Dataset artifact '{absolutePath}' could not be deserial
 524    }
 525
 526    private static JsonElement ParseJsonElement(string value)
 527    {
 528        using var document = JsonDocument.Parse(value);
 529        return document.RootElement.Clone();
 530    }
 531
 532    private static void Assert(bool condition, string message)
 533    {
 534        if (!condition)
 535        {
 536            throw new InvalidOperationException(message);
 537        }
 538    }
 539
 540    private enum SyncDisposition
 541    {
 542        Created,
 543        Updated,
 544        Unchanged
 545    }
 546
 1547    private sealed record SyncDatasetArtifact(
 1548        [property: JsonPropertyName("datasetName")] string? DatasetName,
 1549        [property: JsonPropertyName("datasetDescription")] string? DatasetDescription,
 1550        [property: JsonPropertyName("datasetMetadata")] JsonElement DatasetMetadata,
 1551        [property: JsonPropertyName("inputSchema")] JsonElement InputSchema,
 1552        [property: JsonPropertyName("expectedOutputSchema")] JsonElement ExpectedOutputSchema,
 1553        [property: JsonPropertyName("items")] IReadOnlyList<SyncDatasetArtifactItem> Items);
 554
 555    private sealed record SyncDatasetArtifactItem(
 556        [property: JsonPropertyName("id")] string Id,
 557        [property: JsonPropertyName("input")] JsonElement Input,
 558        [property: JsonPropertyName("expectedOutput")] JsonElement ExpectedOutput,
 559        [property: JsonPropertyName("metadata")] JsonElement Metadata);
 560}