< Summary

Information
Class: Orchestrator.Infrastructure.Langfuse.LangfuseRetryAfterMetadata
Assembly: Orchestrator
File(s): /home/runner/work/KicktippAi/KicktippAi/src/Orchestrator/Infrastructure/Langfuse/LangfusePublicApiClient.cs
Line coverage
100%
Covered lines: 4
Uncovered lines: 0
Coverable lines: 4
Total lines: 360
Line coverage: 100%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%

File(s)

/home/runner/work/KicktippAi/KicktippAi/src/Orchestrator/Infrastructure/Langfuse/LangfusePublicApiClient.cs

#LineLine coverage
 1using System.Net;
 2using System.Net.Http.Headers;
 3using System.Net.Http.Json;
 4using System.Text.Json;
 5using Microsoft.Extensions.Logging;
 6
 7namespace Orchestrator.Infrastructure.Langfuse;
 8
 9public sealed class LangfusePublicApiClient : ILangfusePublicApiClient
 10{
 11    private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
 12    {
 13        DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull
 14    };
 15
 16    private readonly HttpClient _httpClient;
 17    private readonly ILogger<LangfusePublicApiClient> _logger;
 18
 19    public LangfusePublicApiClient(HttpClient httpClient, ILogger<LangfusePublicApiClient> logger)
 20    {
 21        _httpClient = httpClient;
 22        _logger = logger;
 23    }
 24
 25    public Task<LangfuseDataset?> GetDatasetAsync(string datasetName, CancellationToken cancellationToken = default)
 26    {
 27        return SendForJsonAsync<LangfuseDataset>(HttpMethod.Get, $"datasets/{EncodePathSegment(datasetName)}", null, tru
 28    }
 29
 30    public Task<LangfusePrompt?> GetPromptAsync(
 31        string promptName,
 32        string? label = null,
 33        int? version = null,
 34        CancellationToken cancellationToken = default)
 35    {
 36        ArgumentException.ThrowIfNullOrWhiteSpace(promptName);
 37
 38        var queryParameters = new List<KeyValuePair<string, string?>>
 39        {
 40            new("label", label),
 41            new("version", version?.ToString(System.Globalization.CultureInfo.InvariantCulture))
 42        };
 43        var path = BuildQueryString($"v2/prompts/{EncodePathSegment(promptName)}", queryParameters);
 44        return SendForJsonAsync<LangfusePrompt>(HttpMethod.Get, path, null, true, cancellationToken);
 45    }
 46
 47    public Task<LangfuseDataset> CreateDatasetAsync(LangfuseCreateDatasetRequest request, CancellationToken cancellation
 48    {
 49        return SendForJsonAsync<LangfuseDataset>(HttpMethod.Post, "datasets", request, false, cancellationToken)!;
 50    }
 51
 52    public Task<LangfuseDatasetItem?> GetDatasetItemAsync(string id, CancellationToken cancellationToken = default)
 53    {
 54        return SendForJsonAsync<LangfuseDatasetItem>(HttpMethod.Get, $"dataset-items/{EncodePathSegment(id)}", null, tru
 55    }
 56
 57    public Task<LangfusePaginatedResponse<LangfuseDatasetItem>> ListDatasetItemsAsync(
 58        LangfuseListDatasetItemsRequest request,
 59        CancellationToken cancellationToken = default)
 60    {
 61        ArgumentNullException.ThrowIfNull(request);
 62
 63        var queryParameters = new List<KeyValuePair<string, string?>>
 64        {
 65            new("datasetName", request.DatasetName),
 66            new("version", request.Version),
 67            new("page", request.Page.ToString()),
 68            new("limit", request.Limit.ToString())
 69        };
 70
 71        var path = BuildQueryString("dataset-items", queryParameters);
 72        return SendForJsonAsync<LangfusePaginatedResponse<LangfuseDatasetItem>>(HttpMethod.Get, path, null, false, cance
 73    }
 74
 75    public Task<LangfuseDatasetItem> CreateDatasetItemAsync(LangfuseCreateDatasetItemRequest request, CancellationToken 
 76    {
 77        return SendForJsonAsync<LangfuseDatasetItem>(HttpMethod.Post, "dataset-items", request, false, cancellationToken
 78    }
 79
 80    public Task<LangfuseDatasetRunItem> CreateDatasetRunItemAsync(LangfuseCreateDatasetRunItemRequest request, Cancellat
 81    {
 82        return SendForJsonAsync<LangfuseDatasetRunItem>(HttpMethod.Post, "dataset-run-items", request, false, cancellati
 83    }
 84
 85    public Task<LangfuseDatasetRunWithItems?> GetDatasetRunAsync(string datasetName, string runName, CancellationToken c
 86    {
 87        var path = $"datasets/{EncodePathSegment(datasetName)}/runs/{EncodePathSegment(runName)}";
 88        return SendForJsonAsync<LangfuseDatasetRunWithItems>(HttpMethod.Get, path, null, true, cancellationToken);
 89    }
 90
 91    public Task<LangfuseTraceWithDetails?> GetTraceAsync(string traceId, CancellationToken cancellationToken = default)
 92    {
 93        return SendForJsonAsync<LangfuseTraceWithDetails>(
 94            HttpMethod.Get,
 95            $"traces/{EncodePathSegment(traceId)}",
 96            null,
 97            true,
 98            cancellationToken);
 99    }
 100
 101    public Task<LangfusePaginatedResponse<LangfuseTraceWithDetails>> ListTracesAsync(
 102        LangfuseListTracesRequest request,
 103        CancellationToken cancellationToken = default)
 104    {
 105        ArgumentNullException.ThrowIfNull(request);
 106
 107        var queryParameters = new List<KeyValuePair<string, string?>>
 108        {
 109            new("sessionId", request.SessionId),
 110            new("page", request.Page.ToString()),
 111            new("limit", request.Limit.ToString()),
 112            new("fields", request.Fields)
 113        };
 114
 115        var path = BuildQueryString("traces", queryParameters);
 116        return SendForJsonAsync<LangfusePaginatedResponse<LangfuseTraceWithDetails>>(HttpMethod.Get, path, null, false, 
 117    }
 118
 119    public Task<LangfuseCursorPaginatedResponse<LangfuseObservationDetail>> ListObservationsAsync(
 120        LangfuseListObservationsRequest request,
 121        CancellationToken cancellationToken = default)
 122    {
 123        ArgumentNullException.ThrowIfNull(request);
 124
 125        var queryParameters = new List<KeyValuePair<string, string?>>
 126        {
 127            new("sessionId", request.SessionId),
 128            new("limit", request.Limit.ToString()),
 129            new("cursor", request.Cursor),
 130            new("fields", request.Fields)
 131        };
 132
 133        var path = BuildQueryString("v2/observations", queryParameters);
 134        return SendForJsonAsync<LangfuseCursorPaginatedResponse<LangfuseObservationDetail>>(HttpMethod.Get, path, null, 
 135    }
 136
 137    public async Task<bool> DeleteDatasetRunAsync(string datasetName, string runName, CancellationToken cancellationToke
 138    {
 139        var path = $"datasets/{EncodePathSegment(datasetName)}/runs/{EncodePathSegment(runName)}";
 140        using var response = await _httpClient.DeleteAsync(path, cancellationToken);
 141        if (response.StatusCode == HttpStatusCode.NotFound)
 142        {
 143            return false;
 144        }
 145
 146        await EnsureSuccessAsync(response, cancellationToken);
 147        return true;
 148    }
 149
 150    public Task<LangfusePaginatedResponse<LangfuseDatasetRunItem>> ListDatasetRunItemsAsync(
 151        string datasetId,
 152        string runName,
 153        int page = 1,
 154        int limit = 100,
 155        CancellationToken cancellationToken = default)
 156    {
 157        var path = $"dataset-run-items?datasetId={Uri.EscapeDataString(datasetId)}&runName={Uri.EscapeDataString(runName
 158        return SendForJsonAsync<LangfusePaginatedResponse<LangfuseDatasetRunItem>>(HttpMethod.Get, path, null, false, ca
 159    }
 160
 161    public Task<LangfusePaginatedResponse<LangfuseScore>> ListScoresAsync(
 162        LangfuseListScoresRequest request,
 163        CancellationToken cancellationToken = default)
 164    {
 165        ArgumentNullException.ThrowIfNull(request);
 166
 167        var queryParameters = new List<KeyValuePair<string, string?>>
 168        {
 169            new("page", request.Page.ToString()),
 170            new("limit", request.Limit.ToString()),
 171            new("fields", request.Fields),
 172            new("name", request.Name),
 173            new("datasetRunId", request.DatasetRunId),
 174            new("sessionId", request.SessionId),
 175            new("filter", request.Filter),
 176            new("traceId", request.TraceId)
 177        };
 178
 179        var path = BuildQueryString("v2/scores", queryParameters);
 180        return SendForJsonAsync<LangfusePaginatedResponse<LangfuseScore>>(HttpMethod.Get, path, null, false, cancellatio
 181    }
 182
 183    public Task<LangfuseCreateScoreResponse> CreateScoreAsync(LangfuseCreateScoreRequest request, CancellationToken canc
 184    {
 185        return SendForJsonAsync<LangfuseCreateScoreResponse>(HttpMethod.Post, "scores", request, false, cancellationToke
 186    }
 187
 188    private async Task<T?> SendForJsonAsync<T>(
 189        HttpMethod method,
 190        string relativePath,
 191        object? body,
 192        bool returnNullOnNotFound,
 193        CancellationToken cancellationToken)
 194    {
 195        using var request = new HttpRequestMessage(method, relativePath);
 196
 197        if (body is not null)
 198        {
 199            request.Content = JsonContent.Create(body, options: SerializerOptions);
 200        }
 201
 202        using var response = await _httpClient.SendAsync(request, cancellationToken);
 203        if (returnNullOnNotFound && response.StatusCode == HttpStatusCode.NotFound)
 204        {
 205            return default;
 206        }
 207
 208        await EnsureSuccessAsync(response, cancellationToken);
 209
 210        try
 211        {
 212            var result = await response.Content.ReadFromJsonAsync<T>(SerializerOptions, cancellationToken);
 213            if (result is null)
 214            {
 215                throw new LangfusePublicApiException(response.StatusCode, relativePath, "Langfuse returned an empty JSON
 216            }
 217
 218            return result;
 219        }
 220        catch (JsonException ex)
 221        {
 222            var responseBody = await response.Content.ReadAsStringAsync(cancellationToken);
 223            _logger.LogError(ex, "Failed to deserialize Langfuse response from {Path}: {Body}", relativePath, responseBo
 224            throw new LangfusePublicApiException(response.StatusCode, relativePath, responseBody, innerException: ex);
 225        }
 226    }
 227
 228    private static async Task EnsureSuccessAsync(HttpResponseMessage response, CancellationToken cancellationToken)
 229    {
 230        if (response.IsSuccessStatusCode)
 231        {
 232            return;
 233        }
 234
 235        var body = await response.Content.ReadAsStringAsync(cancellationToken);
 236        var retryMetadata = LangfuseRetryAfterUtility.GetRetryAfterMetadata(response.Headers);
 237        throw new LangfusePublicApiException(
 238            response.StatusCode,
 239            response.RequestMessage?.RequestUri?.ToString() ?? string.Empty,
 240            body,
 241            retryMetadata.RetryAfterHeaderValue,
 242            retryMetadata.RetryAfterDelay,
 243            retryMetadata.RetryAfterAtUtc);
 244    }
 245
 246    private static string EncodePathSegment(string value)
 247    {
 248        return Uri.EscapeDataString(value);
 249    }
 250
 251    private static string BuildQueryString(string relativePath, IEnumerable<KeyValuePair<string, string?>> queryParamete
 252    {
 253        var parts = queryParameters
 254            .Where(parameter => !string.IsNullOrWhiteSpace(parameter.Value))
 255            .Select(parameter => $"{Uri.EscapeDataString(parameter.Key)}={Uri.EscapeDataString(parameter.Value!)}")
 256            .ToArray();
 257
 258        return parts.Length == 0
 259            ? relativePath
 260            : $"{relativePath}?{string.Join("&", parts)}";
 261    }
 262}
 263
 264internal static class LangfuseRetryAfterUtility
 265{
 266    public static LangfuseRetryAfterMetadata GetRetryAfterMetadata(HttpResponseHeaders headers)
 267    {
 268        var headerValue = headers.TryGetValues("Retry-After", out var values)
 269            ? string.Join(", ", values)
 270            : null;
 271        var retryAfter = headers.RetryAfter;
 272
 273        if (retryAfter?.Delta is TimeSpan delta)
 274        {
 275            return new LangfuseRetryAfterMetadata(headerValue, delta < TimeSpan.Zero ? TimeSpan.Zero : delta, null);
 276        }
 277
 278        if (retryAfter?.Date is DateTimeOffset retryAtUtc)
 279        {
 280            var delay = retryAtUtc - DateTimeOffset.UtcNow;
 281            return new LangfuseRetryAfterMetadata(
 282                headerValue,
 283                delay < TimeSpan.Zero ? TimeSpan.Zero : delay,
 284                retryAtUtc);
 285        }
 286
 287        return new LangfuseRetryAfterMetadata(headerValue, null, null);
 288    }
 289}
 290
 1291internal sealed record LangfuseRetryAfterMetadata(
 1292    string? RetryAfterHeaderValue,
 1293    TimeSpan? RetryAfterDelay,
 1294    DateTimeOffset? RetryAfterAtUtc);
 295
 296public sealed class LangfusePublicApiException : Exception
 297{
 298    public LangfusePublicApiException(
 299        HttpStatusCode statusCode,
 300        string endpoint,
 301        string responseBody,
 302        string? retryAfterHeaderValue = null,
 303        TimeSpan? retryAfterDelay = null,
 304        DateTimeOffset? retryAfterAtUtc = null,
 305        Exception? innerException = null)
 306        : base(BuildMessage(statusCode, endpoint, responseBody, retryAfterHeaderValue, retryAfterDelay, retryAfterAtUtc)
 307    {
 308        StatusCode = statusCode;
 309        Endpoint = endpoint;
 310        ResponseBody = responseBody;
 311        RetryAfterHeaderValue = retryAfterHeaderValue;
 312        RetryAfterDelay = retryAfterDelay;
 313        RetryAfterAtUtc = retryAfterAtUtc;
 314    }
 315
 316    public HttpStatusCode StatusCode { get; }
 317
 318    public string Endpoint { get; }
 319
 320    public string ResponseBody { get; }
 321
 322    public string? RetryAfterHeaderValue { get; }
 323
 324    public TimeSpan? RetryAfterDelay { get; }
 325
 326    public DateTimeOffset? RetryAfterAtUtc { get; }
 327
 328    private static string BuildMessage(
 329        HttpStatusCode statusCode,
 330        string endpoint,
 331        string responseBody,
 332        string? retryAfterHeaderValue,
 333        TimeSpan? retryAfterDelay,
 334        DateTimeOffset? retryAfterAtUtc)
 335    {
 336        var retryAfterSuffix = string.IsNullOrWhiteSpace(retryAfterHeaderValue)
 337            ? string.Empty
 338            : $" Retry-After: {retryAfterHeaderValue}.";
 339        var retryWindowSuffix = retryAfterDelay is null && retryAfterAtUtc is null
 340            ? string.Empty
 341            : $" Retry window: {FormatRetryWindow(retryAfterDelay, retryAfterAtUtc)}.";
 342
 343        return $"Langfuse API request failed with status {(int)statusCode} ({statusCode}) for '{endpoint}'. Response: {r
 344    }
 345
 346    private static string FormatRetryWindow(TimeSpan? retryAfterDelay, DateTimeOffset? retryAfterAtUtc)
 347    {
 348        if (retryAfterDelay is { } delay && retryAfterAtUtc is { } retryAtUtc)
 349        {
 350            return $"wait {delay:c} until {retryAtUtc:O}";
 351        }
 352
 353        if (retryAfterDelay is { } delayOnly)
 354        {
 355            return $"wait {delayOnly:c}";
 356        }
 357
 358        return $"until {retryAfterAtUtc:O}";
 359    }
 360}