From ee8887b97f457e193fce6285ed108f40bc6dee0a Mon Sep 17 00:00:00 2001 From: Codex Date: Mon, 8 Jun 2026 00:35:50 +0200 Subject: [PATCH] Enforce source-time provenance across ingest --- README.md | 11 +++++++-- client.go | 55 +++++++++++++++++++++++++++------------------ client_test.go | 7 ++++++ cmd/attesto/main.go | 3 ++- 4 files changed, 51 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 799bd76..d08a0a1 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ import ( "fmt" "log" "os" + "time" attesto "git.rotz.ai/rotzmediagroup/attesto-v1/sdk/go" ) @@ -43,7 +44,8 @@ func main() { } receipt, err := client.LogEvent(ctx, stream.StreamID, attesto.EventInput{ - SourceRef: "decision-42", + SourceRef: "decision-42", + OccurredAt: time.Now().UTC().Format(time.RFC3339Nano), Payload: attesto.M{ "model": "risk-classifier", "score": 0.92, @@ -53,10 +55,15 @@ func main() { log.Fatal(err) } - fmt.Println(receipt.StreamEventID, receipt.EventHash) +fmt.Println(receipt.StreamEventID, receipt.EventHash) } ``` +Attesto stores source-system time separately from backend ingest time. +`OccurredAt` must be RFC3339 with a timezone offset. The Go SDK fills it with +`time.Now().UTC()` when omitted, but production integrations should pass the +real upstream event timestamp whenever the source system provides one. + ## Verification Remote verification uses Attesto's public `/v2/verify` API. Offline receipt diff --git a/client.go b/client.go index 40a1920..2797412 100644 --- a/client.go +++ b/client.go @@ -158,17 +158,8 @@ func (c *Client) GetTenantStream(ctx context.Context, streamID string) (*TenantS } func (c *Client) LogEvent(ctx context.Context, streamID string, input EventInput, options ...RequestOptions) (*EventReceipt, error) { - if input.EventType == "" { - input.EventType = "inference" - } - if input.SourceKind == "" { - input.SourceKind = "sdk" - } - if input.Payload == nil { - input.Payload = M{} - } - if input.Metadata == nil { - input.Metadata = M{} + if err := normalizeEventInput(&input); err != nil { + return nil, err } var out EventReceipt err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events", nil, input, idempotency(options), &out) @@ -180,17 +171,8 @@ func (c *Client) LogEvents(ctx context.Context, streamID string, events []EventI return nil, errors.New("max 1000 events per batch") } for i := range events { - if events[i].EventType == "" { - events[i].EventType = "inference" - } - if events[i].SourceKind == "" { - events[i].SourceKind = "sdk" - } - if events[i].Payload == nil { - events[i].Payload = M{} - } - if events[i].Metadata == nil { - events[i].Metadata = M{} + if err := normalizeEventInput(&events[i]); err != nil { + return nil, fmt.Errorf("event %d: %w", i, err) } } body := M{"events": events} @@ -331,6 +313,9 @@ func (c *Client) RevokeRepositoryWebhookConnector(ctx context.Context, connector } func (c *Client) IngestSignedWebhookEvent(ctx context.Context, connectorID string, event EventInput, secret string) (*EventReceipt, error) { + if err := normalizeEventInput(&event); err != nil { + return nil, err + } raw, err := json.Marshal(event) if err != nil { return nil, err @@ -341,6 +326,32 @@ func (c *Client) IngestSignedWebhookEvent(ctx context.Context, connectorID strin return &out, err } +func normalizeEventInput(input *EventInput) error { + if input.SourceRef == "" { + return errors.New("source ref must not be blank") + } + if input.EventType == "" { + input.EventType = "inference" + } + if input.SourceKind == "" { + input.SourceKind = "sdk" + } + if input.OccurredAt == "" { + input.OccurredAt = time.Now().UTC().Format(time.RFC3339Nano) + } else if _, err := time.Parse(time.RFC3339, input.OccurredAt); err != nil { + if _, nanoErr := time.Parse(time.RFC3339Nano, input.OccurredAt); nanoErr != nil { + return errors.New("occurredAt must include a valid RFC3339 timezone offset") + } + } + if input.Payload == nil { + input.Payload = M{} + } + if input.Metadata == nil { + input.Metadata = M{} + } + return nil +} + func (c *Client) IngestRepositoryWebhookEvent(ctx context.Context, connectorID string, rawBody []byte, headers map[string]string) (*EventReceipt, error) { var out EventReceipt err := c.requestRaw(ctx, http.MethodPost, "/v2/connectors/repository-webhooks/"+url.PathEscape(connectorID)+"/events", nil, rawBody, headers, "", &out) diff --git a/client_test.go b/client_test.go index c0fb78a..a8c2a1b 100644 --- a/client_test.go +++ b/client_test.go @@ -31,6 +31,13 @@ func TestClientCallsProductionV2Endpoints(t *testing.T) { if r.Header.Get("Idempotency-Key") == "" { t.Fatalf("idempotency key missing") } + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode event body: %v", err) + } + if payload["occurredAt"] == "" { + t.Fatalf("occurredAt missing from event body") + } json.NewEncoder(w).Encode(EventReceipt{ StreamID: "str_123", StreamEventID: "evt_123", SeqNo: 1, EventHash: strings.Repeat("a", 64), StreamHeadHash: strings.Repeat("b", 64), Receipt: SignedReceipt{Payload: M{}, ReceiptHash: strings.Repeat("c", 64), Signature: ReceiptSignature{Alg: "ed25519"}}, diff --git a/cmd/attesto/main.go b/cmd/attesto/main.go index 5229916..af0f271 100644 --- a/cmd/attesto/main.go +++ b/cmd/attesto/main.go @@ -260,6 +260,7 @@ func (a *app) events(ctx context.Context, args []string) error { streamID := fs.String("stream-id", "", "stream id") sourceRef := fs.String("source-ref", "", "source reference") eventType := fs.String("event-type", "inference", "event type") + occurredAt := fs.String("occurred-at", "", "source timestamp (RFC3339, defaults to current runtime time)") payloadFile := fs.String("payload-file", "", "JSON payload file") metadataFile := fs.String("metadata-file", "", "JSON metadata file") if err := fs.Parse(args[1:]); err != nil { @@ -273,7 +274,7 @@ func (a *app) events(ctx context.Context, args []string) error { if err != nil { return err } - receipt, err := client.LogEvent(ctx, *streamID, attesto.EventInput{SourceRef: *sourceRef, EventType: *eventType, Payload: payload, Metadata: metadata}) + receipt, err := client.LogEvent(ctx, *streamID, attesto.EventInput{SourceRef: *sourceRef, EventType: *eventType, OccurredAt: *occurredAt, Payload: payload, Metadata: metadata}) if err != nil { return err }