< Summary

Information
Class: Orchestrator.Commands.Observability.SyncDataset.SyncDatasetCommand
Assembly: Orchestrator
File(s): /home/runner/work/KicktippAi/KicktippAi/src/Orchestrator/Commands/Observability/SyncDataset/SyncDatasetCommand.cs
Line coverage
88%
Covered lines: 324
Uncovered lines: 43
Coverable lines: 367
Total lines: 560
Line coverage: 88.2%
Branch coverage
62%
Covered branches: 80
Total branches: 128
Branch coverage: 62.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
.ctor(...)100%11100%
ExecuteAsync()77.78%241873.53%
SyncDatasetItemAsync()33.33%6692.31%
HasSameCanonicalContent(...)0%7280%
CreateOrUpdateDatasetAsync()50%44100%
CreateDryRunDataset(...)50%22100%
BuildDatasetDefinition(...)75%1616100%
ValidateArtifact(...)100%66100%
ValidateValueAgainstSchema(...)65.91%824473.08%
GetSchemaKeys(...)66.67%121285.71%
GetStringProperty(...)50%8883.33%
LoadArtifactAsync()50%22100%
ParseJsonElement(...)100%11100%
Assert(...)50%2266.67%
.ctor(...)100%11100%
.ctor(...)100%11100%

File(s)

/home/runner/work/KicktippAi/KicktippAi/src/Orchestrator/Commands/Observability/SyncDataset/SyncDatasetCommand.cs

#LineLine coverage
 1using System.Text.Json;
 2using System.Text.Json.Serialization;
 3using Microsoft.Extensions.Logging;
 4using Orchestrator.Commands.Observability.Experiments;
 5using Orchestrator.Infrastructure.Langfuse;
 6using Spectre.Console;
 7using Spectre.Console.Cli;
 8
 9namespace Orchestrator.Commands.Observability.SyncDataset;
 10
 11public sealed class SyncDatasetCommand : AsyncCommand<SyncDatasetSettings>
 12{
 113    private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
 114    {
 115        PropertyNameCaseInsensitive = true,
 116        PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
 117        WriteIndented = true,
 118        DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
 119    };
 20
 121    private static readonly JsonElement DefaultInputSchema = ParseJsonElement(
 122        """
 123        {
 124          "type": "object",
 125          "properties": {
 126            "homeTeam": {
 127              "type": "string",
 128              "minLength": 1,
 129              "description": "Exact home team name from the persisted match outcome"
 130            },
 131            "awayTeam": {
 132              "type": "string",
 133              "minLength": 1,
 134              "description": "Exact away team name from the persisted match outcome"
 135            },
 136            "startsAt": {
 137              "type": "string",
 138              "minLength": 1,
 139              "description": "Localized match start timestamp string emitted by the .NET exporter"
 140            }
 141          },
 142          "required": ["homeTeam", "awayTeam", "startsAt"],
 143          "additionalProperties": false
 144        }
 145        """);
 46
 147    private static readonly JsonElement DefaultExpectedOutputSchema = ParseJsonElement(
 148        """
 149        {
 150          "type": "object",
 151          "properties": {
 152            "homeGoals": {
 153              "type": "integer",
 154              "minimum": 0,
 155              "description": "Actual home goals scored in the completed match"
 156            },
 157            "awayGoals": {
 158              "type": "integer",
 159              "minimum": 0,
 160              "description": "Actual away goals scored in the completed match"
 161            }
 162          },
 163          "required": ["homeGoals", "awayGoals"],
 164          "additionalProperties": false
 165        }
 166        """);
 67
 168    private static readonly JsonElement MetadataSchema = ParseJsonElement(
 169        """
 170        {
 171          "type": "object",
 172          "properties": {
 173            "competition": {
 174              "type": "string",
 175              "minLength": 1,
 176              "description": "Competition identifier"
 177            },
 178            "season": {
 179              "type": "string",
 180              "minLength": 1,
 181              "description": "Season label"
 182            },
 183            "communityContext": {
 184              "type": "string",
 185              "minLength": 1,
 186              "description": "Community context slug"
 187            },
 188            "matchday": {
 189              "type": "integer",
 190              "minimum": 1,
 191              "description": "Bundesliga matchday number"
 192            },
 193            "matchdayLabel": {
 194              "type": "string",
 195              "minLength": 1,
 196              "description": "Human-readable matchday label"
 197            },
 198            "homeTeam": {
 199              "type": "string",
 1100              "minLength": 1,
 1101              "description": "Exact home team name"
 1102            },
 1103            "awayTeam": {
 1104              "type": "string",
 1105              "minLength": 1,
 1106              "description": "Exact away team name"
 1107            },
 1108            "tippSpielId": {
 1109              "type": "string",
 1110              "minLength": 1,
 1111              "description": "Kicktipp match identifier"
 1112            },
 1113            "fixtureIndex": {
 1114              "type": "integer",
 1115              "minimum": 1,
 1116              "description": "One-based fixture index for repeated-match-slice datasets"
 1117            },
 1118            "repetitionIndex": {
 1119              "type": "integer",
 1120              "minimum": 1,
 1121              "description": "One-based repetition index for repeated-match-slice datasets"
 1122            }
 1123          },
 1124          "required": [
 1125            "competition",
 1126            "season",
 1127            "communityContext",
 1128            "matchday",
 1129            "matchdayLabel",
 1130            "homeTeam",
 1131            "awayTeam",
 1132            "tippSpielId"
 1133          ],
 1134          "additionalProperties": false
 1135        }
 1136        """);
 137
 138    private readonly IAnsiConsole _console;
 139    private readonly ILangfusePublicApiClient _langfuseClient;
 140    private readonly ILogger<SyncDatasetCommand> _logger;
 141
 1142    public SyncDatasetCommand(
 1143        IAnsiConsole console,
 1144        ILangfusePublicApiClient langfuseClient,
 1145        ILogger<SyncDatasetCommand> logger)
 146    {
 1147        _console = console;
 1148        _langfuseClient = langfuseClient;
 1149        _logger = logger;
 1150    }
 151
 152    protected override async Task<int> ExecuteAsync(CommandContext context, SyncDatasetSettings settings, CancellationTo
 153    {
 154        try
 155        {
 1156            var artifact = await LoadArtifactAsync(settings.InputPath, cancellationToken);
 1157            var datasetName = settings.DatasetName ?? artifact.DatasetName;
 1158            if (string.IsNullOrWhiteSpace(datasetName))
 159            {
 0160                throw new InvalidOperationException("Dataset name missing from artifact and no --dataset-name override w
 161            }
 162
 1163            ValidateArtifact(artifact, datasetName);
 164
 1165            PreparedExperimentSupport.ReportProgress(
 1166                $"Starting dataset sync for '{datasetName}' with {artifact.Items.Count} item(s). Dry run: {settings.DryR
 167
 1168            var datasetDefinition = BuildDatasetDefinition(artifact, datasetName);
 1169            var dataset = settings.DryRun
 1170                ? CreateDryRunDataset(datasetDefinition)
 1171                : await CreateOrUpdateDatasetAsync(datasetDefinition, cancellationToken);
 172
 1173            PreparedExperimentSupport.ReportProgress(
 1174                settings.DryRun
 1175                    ? $"Validated dataset definition for '{datasetName}' in dry-run mode."
 1176                    : $"Dataset '{datasetName}' is ready with id '{dataset.Id}'.");
 177
 1178            var created = 0;
 1179            var updated = 0;
 1180            var unchanged = 0;
 1181            var failures = new List<object>();
 182
 1183            for (var itemIndex = 0; itemIndex < artifact.Items.Count; itemIndex += 1)
 184            {
 1185                var item = artifact.Items[itemIndex];
 186
 1187                if (settings.DryRun)
 188                {
 1189                    created += 1;
 1190                    continue;
 191                }
 192
 193                try
 194                {
 1195                    PreparedExperimentSupport.ReportProgress(
 1196                        $"Syncing dataset item {itemIndex + 1}/{artifact.Items.Count}: {item.Id}.");
 1197                    var disposition = await SyncDatasetItemAsync(datasetName, item, cancellationToken);
 198                    switch (disposition)
 199                    {
 200                        case SyncDisposition.Created:
 1201                            created += 1;
 1202                            break;
 203                        case SyncDisposition.Updated:
 0204                            updated += 1;
 0205                            break;
 206                        default:
 0207                            unchanged += 1;
 208                            break;
 209                    }
 1210                }
 0211                catch (Exception ex)
 212                {
 0213                    PreparedExperimentSupport.ReportProgress(
 0214                        $"Dataset item {itemIndex + 1}/{artifact.Items.Count} failed: {item.Id}. {ex.Message}");
 0215                    failures.Add(new
 0216                    {
 0217                        itemId = item.Id,
 0218                        message = ex.Message
 0219                    });
 0220                }
 1221            }
 222
 1223            if (failures.Count > 0)
 224            {
 0225                throw new InvalidOperationException($"Dataset sync failed for {failures.Count} item(s): {JsonSerializer.
 226            }
 227
 1228            PreparedExperimentSupport.ReportProgress(
 1229                $"Completed dataset sync for '{datasetName}': created={created}, updated={updated}, unchanged={unchanged
 230
 1231            var summary = new
 1232            {
 1233                datasetName,
 1234                datasetId = dataset.Id,
 1235                datasetInputSchemaKeys = GetSchemaKeys(dataset.InputSchema),
 1236                datasetExpectedOutputSchemaKeys = GetSchemaKeys(dataset.ExpectedOutputSchema),
 1237                dryRun = settings.DryRun,
 1238                itemCount = artifact.Items.Count,
 1239                created,
 1240                updated,
 1241                unchanged,
 1242                firstItemId = artifact.Items.FirstOrDefault()?.Id,
 1243                lastItemId = artifact.Items.LastOrDefault()?.Id
 1244            };
 245
 1246            _console.WriteLine(JsonSerializer.Serialize(summary, SerializerOptions));
 1247            return 0;
 248        }
 0249        catch (Exception ex)
 250        {
 0251            _logger.LogError(ex, "Error executing sync-dataset command");
 0252            _console.MarkupLine($"[red]Error:[/] {Markup.Escape(ex.Message)}");
 0253            return 1;
 254        }
 1255    }
 256
 257    private async Task<SyncDisposition> SyncDatasetItemAsync(
 258        string datasetName,
 259        SyncDatasetArtifactItem item,
 260        CancellationToken cancellationToken)
 261    {
 1262        var existingItem = await _langfuseClient.GetDatasetItemAsync(item.Id, cancellationToken);
 263
 1264        if (existingItem is not null && HasSameCanonicalContent(existingItem, item, datasetName))
 265        {
 0266            return SyncDisposition.Unchanged;
 267        }
 268
 1269        await _langfuseClient.CreateDatasetItemAsync(
 1270            new LangfuseCreateDatasetItemRequest(
 1271                item.Id,
 1272                datasetName,
 1273                item.Input,
 1274                item.ExpectedOutput,
 1275                item.Metadata),
 1276            cancellationToken);
 277
 1278        return existingItem is null ? SyncDisposition.Created : SyncDisposition.Updated;
 1279    }
 280
 281    private static bool HasSameCanonicalContent(LangfuseDatasetItem existingItem, SyncDatasetArtifactItem item, string d
 282    {
 0283        if (!string.IsNullOrWhiteSpace(existingItem.DatasetName)
 0284            && !string.Equals(existingItem.DatasetName, datasetName, StringComparison.Ordinal))
 285        {
 0286            throw new InvalidOperationException(
 0287                $"Dataset item '{item.Id}' already exists in dataset '{existingItem.DatasetName}', not '{datasetName}'."
 288        }
 289
 0290        return LangfuseJsonUtilities.StableEquals(existingItem.Input, item.Input)
 0291            && LangfuseJsonUtilities.StableEquals(existingItem.ExpectedOutput, item.ExpectedOutput)
 0292            && LangfuseJsonUtilities.StableEquals(existingItem.Metadata, item.Metadata);
 293    }
 294
 295    private async Task<LangfuseDataset> CreateOrUpdateDatasetAsync(
 296        LangfuseCreateDatasetRequest datasetDefinition,
 297        CancellationToken cancellationToken)
 298    {
 1299        var createdDataset = await _langfuseClient.CreateDatasetAsync(datasetDefinition, cancellationToken);
 1300        return createdDataset with
 1301        {
 1302            InputSchema = LangfuseJsonUtilities.IsDefined(createdDataset.InputSchema)
 1303                ? createdDataset.InputSchema
 1304                : datasetDefinition.InputSchema ?? default,
 1305            ExpectedOutputSchema = LangfuseJsonUtilities.IsDefined(createdDataset.ExpectedOutputSchema)
 1306                ? createdDataset.ExpectedOutputSchema
 1307                : datasetDefinition.ExpectedOutputSchema ?? default
 1308        };
 1309    }
 310
 311    private static LangfuseDataset CreateDryRunDataset(LangfuseCreateDatasetRequest datasetDefinition)
 312    {
 1313        return new LangfuseDataset(
 1314            string.Empty,
 1315            datasetDefinition.Name,
 1316            datasetDefinition.Description,
 1317            datasetDefinition.Metadata is JsonElement metadata ? metadata : default,
 1318            datasetDefinition.InputSchema ?? default,
 1319            datasetDefinition.ExpectedOutputSchema ?? default);
 320    }
 321
 322    private static LangfuseCreateDatasetRequest BuildDatasetDefinition(SyncDatasetArtifact artifact, string datasetName)
 323    {
 1324        var firstItemMetadata = artifact.Items.FirstOrDefault()?.Metadata ?? default;
 1325        var competition = GetStringProperty(firstItemMetadata, "competition") ?? "bundesliga-2025-26";
 1326        var season = GetStringProperty(firstItemMetadata, "season") ?? "2025/2026";
 1327        var communityContext = GetStringProperty(firstItemMetadata, "communityContext")
 1328            ?? datasetName.Split('/', StringSplitOptions.RemoveEmptyEntries).Last();
 329
 1330        var datasetMetadata = LangfuseJsonUtilities.IsDefined(artifact.DatasetMetadata)
 1331            ? artifact.DatasetMetadata
 1332            : JsonSerializer.SerializeToElement(new
 1333            {
 1334                competition,
 1335                communityContext,
 1336                scope = "match-centric",
 1337                season
 1338            }, SerializerOptions);
 339
 1340        return new LangfuseCreateDatasetRequest(
 1341            datasetName,
 1342            artifact.DatasetDescription
 1343            ?? $"Hosted dataset for {season} {communityContext} {competition} match experiments",
 1344            datasetMetadata,
 1345            LangfuseJsonUtilities.IsDefined(artifact.InputSchema) ? artifact.InputSchema : DefaultInputSchema,
 1346            LangfuseJsonUtilities.IsDefined(artifact.ExpectedOutputSchema)
 1347                ? artifact.ExpectedOutputSchema
 1348                : DefaultExpectedOutputSchema);
 349    }
 350
 351    private static void ValidateArtifact(SyncDatasetArtifact artifact, string datasetName)
 352    {
 1353        Assert(!string.IsNullOrWhiteSpace(datasetName), "Dataset name must be a non-empty string.");
 354
 1355        var inputSchema = LangfuseJsonUtilities.IsDefined(artifact.InputSchema)
 1356            ? artifact.InputSchema
 1357            : DefaultInputSchema;
 1358        var expectedOutputSchema = LangfuseJsonUtilities.IsDefined(artifact.ExpectedOutputSchema)
 1359            ? artifact.ExpectedOutputSchema
 1360            : DefaultExpectedOutputSchema;
 361
 1362        var seenItemIds = new HashSet<string>(StringComparer.Ordinal);
 1363        foreach (var item in artifact.Items)
 364        {
 1365            Assert(!string.IsNullOrWhiteSpace(item.Id), "Each dataset item must have a non-empty string id.");
 1366            Assert(seenItemIds.Add(item.Id), $"Duplicate dataset item id '{item.Id}' found in artifact.");
 367
 1368            ValidateValueAgainstSchema(item.Input, inputSchema, $"Dataset item '{item.Id}' input");
 1369            ValidateValueAgainstSchema(item.ExpectedOutput, expectedOutputSchema, $"Dataset item '{item.Id}' expectedOut
 1370            ValidateValueAgainstSchema(item.Metadata, MetadataSchema, $"Dataset item '{item.Id}' metadata");
 371        }
 1372    }
 373
 374    private static void ValidateValueAgainstSchema(JsonElement value, JsonElement schema, string label)
 375    {
 1376        Assert(schema.ValueKind == JsonValueKind.Object, $"{label} schema must be an object.");
 377
 1378        var schemaType = schema.GetProperty("type").GetString();
 379        switch (schemaType)
 380        {
 381            case "object":
 1382                Assert(value.ValueKind == JsonValueKind.Object, $"{label} must be an object.");
 383
 1384                var properties = schema.TryGetProperty("properties", out var propertyNode)
 1385                    ? propertyNode
 1386                    : default;
 1387                var required = schema.TryGetProperty("required", out var requiredNode)
 1388                    ? requiredNode.EnumerateArray().Select(entry => entry.GetString()!).ToHashSet(StringComparer.Ordinal
 1389                    : [];
 1390                var actualProperties = value.EnumerateObject().ToList();
 1391                var allowedKeys = properties.ValueKind == JsonValueKind.Object
 1392                    ? properties.EnumerateObject().Select(property => property.Name).ToHashSet(StringComparer.Ordinal)
 1393                    : [];
 394
 1395                if (schema.TryGetProperty("additionalProperties", out var additionalProperties)
 1396                    && additionalProperties.ValueKind == JsonValueKind.False)
 397                {
 1398                    var unexpectedKeys = actualProperties
 1399                        .Select(property => property.Name)
 1400                        .Where(name => !allowedKeys.Contains(name))
 1401                        .ToArray();
 1402                    Assert(
 1403                        unexpectedKeys.Length == 0,
 1404                        $"{label} must not contain unexpected keys: {string.Join(", ", unexpectedKeys)}.");
 405                }
 406
 1407                foreach (var requiredKey in required)
 408                {
 1409                    Assert(value.TryGetProperty(requiredKey, out _), $"{label}.{requiredKey} is required.");
 410                }
 411
 1412                if (properties.ValueKind == JsonValueKind.Object)
 413                {
 1414                    foreach (var actualProperty in actualProperties)
 415                    {
 1416                        if (properties.TryGetProperty(actualProperty.Name, out var propertySchema))
 417                        {
 1418                            ValidateValueAgainstSchema(actualProperty.Value, propertySchema, $"{label}.{actualProperty.N
 419                        }
 420                    }
 421                }
 422
 1423                return;
 424
 425            case "string":
 1426                Assert(value.ValueKind == JsonValueKind.String, $"{label} must be a non-empty string.");
 1427                var stringValue = value.GetString() ?? string.Empty;
 1428                Assert(!string.IsNullOrWhiteSpace(stringValue), $"{label} must be a non-empty string.");
 429
 1430                if (schema.TryGetProperty("minLength", out var minLengthNode))
 431                {
 1432                    Assert(stringValue.Length >= minLengthNode.GetInt32(), $"{label} must be at least {minLengthNode.Get
 433                }
 434
 1435                if (schema.TryGetProperty("maxLength", out var maxLengthNode))
 436                {
 0437                    Assert(stringValue.Length <= maxLengthNode.GetInt32(), $"{label} must be at most {maxLengthNode.GetI
 438                }
 439
 1440                return;
 441
 442            case "integer":
 1443                Assert(value.ValueKind == JsonValueKind.Number, $"{label} must be an integer.");
 1444                if (!value.TryGetInt64(out var integerValue))
 445                {
 0446                    throw new InvalidOperationException($"{label} must be an integer.");
 447                }
 448
 1449                if (schema.TryGetProperty("minimum", out var minimumIntegerNode))
 450                {
 1451                    Assert(integerValue >= minimumIntegerNode.GetInt64(), $"{label} must be at least {minimumIntegerNode
 452                }
 453
 1454                if (schema.TryGetProperty("maximum", out var maximumIntegerNode))
 455                {
 0456                    Assert(integerValue <= maximumIntegerNode.GetInt64(), $"{label} must be at most {maximumIntegerNode.
 457                }
 458
 1459                return;
 460
 461            case "number":
 0462                Assert(value.ValueKind == JsonValueKind.Number, $"{label} must be a number.");
 0463                if (!value.TryGetDouble(out var numberValue))
 464                {
 0465                    throw new InvalidOperationException($"{label} must be a number.");
 466                }
 467
 0468                if (schema.TryGetProperty("minimum", out var minimumNumberNode))
 469                {
 0470                    Assert(numberValue >= minimumNumberNode.GetDouble(), $"{label} must be at least {minimumNumberNode.G
 471                }
 472
 0473                if (schema.TryGetProperty("maximum", out var maximumNumberNode))
 474                {
 0475                    Assert(numberValue <= maximumNumberNode.GetDouble(), $"{label} must be at most {maximumNumberNode.Ge
 476                }
 477
 0478                return;
 479
 480            case "boolean":
 0481                Assert(value.ValueKind is JsonValueKind.True or JsonValueKind.False, $"{label} must be a boolean.");
 0482                return;
 483
 484            default:
 0485                throw new InvalidOperationException($"Unsupported schema type '{schemaType}' for {label}.");
 486        }
 487    }
 488
 489    private static IReadOnlyList<string> GetSchemaKeys(JsonElement schema)
 490    {
 1491        if (!LangfuseJsonUtilities.IsDefined(schema)
 1492            || schema.ValueKind != JsonValueKind.Object
 1493            || !schema.TryGetProperty("properties", out var properties)
 1494            || properties.ValueKind != JsonValueKind.Object)
 495        {
 0496            return [];
 497        }
 498
 1499        return properties.EnumerateObject()
 1500            .Select(property => property.Name)
 1501            .OrderBy(name => name, StringComparer.Ordinal)
 1502            .ToList();
 503    }
 504
 505    private static string? GetStringProperty(JsonElement element, string propertyName)
 506    {
 1507        if (!LangfuseJsonUtilities.IsDefined(element)
 1508            || element.ValueKind != JsonValueKind.Object
 1509            || !element.TryGetProperty(propertyName, out var property)
 1510            || property.ValueKind != JsonValueKind.String)
 511        {
 0512            return null;
 513        }
 514
 1515        return property.GetString();
 516    }
 517
 518    private static async Task<SyncDatasetArtifact> LoadArtifactAsync(string inputPath, CancellationToken cancellationTok
 519    {
 1520        var absolutePath = Path.GetFullPath(inputPath);
 1521        var raw = await File.ReadAllTextAsync(absolutePath, cancellationToken);
 1522        var artifact = JsonSerializer.Deserialize<SyncDatasetArtifact>(raw, SerializerOptions);
 1523        return artifact ?? throw new InvalidOperationException($"Dataset artifact '{absolutePath}' could not be deserial
 1524    }
 525
 526    private static JsonElement ParseJsonElement(string value)
 527    {
 1528        using var document = JsonDocument.Parse(value);
 1529        return document.RootElement.Clone();
 1530    }
 531
 532    private static void Assert(bool condition, string message)
 533    {
 1534        if (!condition)
 535        {
 0536            throw new InvalidOperationException(message);
 537        }
 1538    }
 539
 540    private enum SyncDisposition
 541    {
 542        Created,
 543        Updated,
 544        Unchanged
 545    }
 546
 1547    private sealed record SyncDatasetArtifact(
 1548        [property: JsonPropertyName("datasetName")] string? DatasetName,
 1549        [property: JsonPropertyName("datasetDescription")] string? DatasetDescription,
 1550        [property: JsonPropertyName("datasetMetadata")] JsonElement DatasetMetadata,
 1551        [property: JsonPropertyName("inputSchema")] JsonElement InputSchema,
 1552        [property: JsonPropertyName("expectedOutputSchema")] JsonElement ExpectedOutputSchema,
 1553        [property: JsonPropertyName("items")] IReadOnlyList<SyncDatasetArtifactItem> Items);
 554
 1555    private sealed record SyncDatasetArtifactItem(
 1556        [property: JsonPropertyName("id")] string Id,
 1557        [property: JsonPropertyName("input")] JsonElement Input,
 1558        [property: JsonPropertyName("expectedOutput")] JsonElement ExpectedOutput,
 1559        [property: JsonPropertyName("metadata")] JsonElement Metadata);
 560}