Enforce source-time provenance across ingest
This commit is contained in:
@@ -23,6 +23,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
attesto "git.rotz.ai/rotzmediagroup/attesto-v1/sdk/go"
|
attesto "git.rotz.ai/rotzmediagroup/attesto-v1/sdk/go"
|
||||||
)
|
)
|
||||||
@@ -44,6 +45,7 @@ func main() {
|
|||||||
|
|
||||||
receipt, err := client.LogEvent(ctx, stream.StreamID, attesto.EventInput{
|
receipt, err := client.LogEvent(ctx, stream.StreamID, attesto.EventInput{
|
||||||
SourceRef: "decision-42",
|
SourceRef: "decision-42",
|
||||||
|
OccurredAt: time.Now().UTC().Format(time.RFC3339Nano),
|
||||||
Payload: attesto.M{
|
Payload: attesto.M{
|
||||||
"model": "risk-classifier",
|
"model": "risk-classifier",
|
||||||
"score": 0.92,
|
"score": 0.92,
|
||||||
@@ -53,10 +55,15 @@ func main() {
|
|||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println(receipt.StreamEventID, receipt.EventHash)
|
fmt.Println(receipt.StreamEventID, receipt.EventHash)
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Attesto stores source-system time separately from backend ingest time.
|
||||||
|
`OccurredAt` must be RFC3339 with a timezone offset. The Go SDK fills it with
|
||||||
|
`time.Now().UTC()` when omitted, but production integrations should pass the
|
||||||
|
real upstream event timestamp whenever the source system provides one.
|
||||||
|
|
||||||
## Verification
|
## Verification
|
||||||
|
|
||||||
Remote verification uses Attesto's public `/v2/verify` API. Offline receipt
|
Remote verification uses Attesto's public `/v2/verify` API. Offline receipt
|
||||||
|
|||||||
55
client.go
55
client.go
@@ -158,17 +158,8 @@ func (c *Client) GetTenantStream(ctx context.Context, streamID string) (*TenantS
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) LogEvent(ctx context.Context, streamID string, input EventInput, options ...RequestOptions) (*EventReceipt, error) {
|
func (c *Client) LogEvent(ctx context.Context, streamID string, input EventInput, options ...RequestOptions) (*EventReceipt, error) {
|
||||||
if input.EventType == "" {
|
if err := normalizeEventInput(&input); err != nil {
|
||||||
input.EventType = "inference"
|
return nil, err
|
||||||
}
|
|
||||||
if input.SourceKind == "" {
|
|
||||||
input.SourceKind = "sdk"
|
|
||||||
}
|
|
||||||
if input.Payload == nil {
|
|
||||||
input.Payload = M{}
|
|
||||||
}
|
|
||||||
if input.Metadata == nil {
|
|
||||||
input.Metadata = M{}
|
|
||||||
}
|
}
|
||||||
var out EventReceipt
|
var out EventReceipt
|
||||||
err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events", nil, input, idempotency(options), &out)
|
err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events", nil, input, idempotency(options), &out)
|
||||||
@@ -180,17 +171,8 @@ func (c *Client) LogEvents(ctx context.Context, streamID string, events []EventI
|
|||||||
return nil, errors.New("max 1000 events per batch")
|
return nil, errors.New("max 1000 events per batch")
|
||||||
}
|
}
|
||||||
for i := range events {
|
for i := range events {
|
||||||
if events[i].EventType == "" {
|
if err := normalizeEventInput(&events[i]); err != nil {
|
||||||
events[i].EventType = "inference"
|
return nil, fmt.Errorf("event %d: %w", i, err)
|
||||||
}
|
|
||||||
if events[i].SourceKind == "" {
|
|
||||||
events[i].SourceKind = "sdk"
|
|
||||||
}
|
|
||||||
if events[i].Payload == nil {
|
|
||||||
events[i].Payload = M{}
|
|
||||||
}
|
|
||||||
if events[i].Metadata == nil {
|
|
||||||
events[i].Metadata = M{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
body := M{"events": events}
|
body := M{"events": events}
|
||||||
@@ -331,6 +313,9 @@ func (c *Client) RevokeRepositoryWebhookConnector(ctx context.Context, connector
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) IngestSignedWebhookEvent(ctx context.Context, connectorID string, event EventInput, secret string) (*EventReceipt, error) {
|
func (c *Client) IngestSignedWebhookEvent(ctx context.Context, connectorID string, event EventInput, secret string) (*EventReceipt, error) {
|
||||||
|
if err := normalizeEventInput(&event); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
raw, err := json.Marshal(event)
|
raw, err := json.Marshal(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -341,6 +326,32 @@ func (c *Client) IngestSignedWebhookEvent(ctx context.Context, connectorID strin
|
|||||||
return &out, err
|
return &out, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func normalizeEventInput(input *EventInput) error {
|
||||||
|
if input.SourceRef == "" {
|
||||||
|
return errors.New("source ref must not be blank")
|
||||||
|
}
|
||||||
|
if input.EventType == "" {
|
||||||
|
input.EventType = "inference"
|
||||||
|
}
|
||||||
|
if input.SourceKind == "" {
|
||||||
|
input.SourceKind = "sdk"
|
||||||
|
}
|
||||||
|
if input.OccurredAt == "" {
|
||||||
|
input.OccurredAt = time.Now().UTC().Format(time.RFC3339Nano)
|
||||||
|
} else if _, err := time.Parse(time.RFC3339, input.OccurredAt); err != nil {
|
||||||
|
if _, nanoErr := time.Parse(time.RFC3339Nano, input.OccurredAt); nanoErr != nil {
|
||||||
|
return errors.New("occurredAt must include a valid RFC3339 timezone offset")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if input.Payload == nil {
|
||||||
|
input.Payload = M{}
|
||||||
|
}
|
||||||
|
if input.Metadata == nil {
|
||||||
|
input.Metadata = M{}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) IngestRepositoryWebhookEvent(ctx context.Context, connectorID string, rawBody []byte, headers map[string]string) (*EventReceipt, error) {
|
func (c *Client) IngestRepositoryWebhookEvent(ctx context.Context, connectorID string, rawBody []byte, headers map[string]string) (*EventReceipt, error) {
|
||||||
var out EventReceipt
|
var out EventReceipt
|
||||||
err := c.requestRaw(ctx, http.MethodPost, "/v2/connectors/repository-webhooks/"+url.PathEscape(connectorID)+"/events", nil, rawBody, headers, "", &out)
|
err := c.requestRaw(ctx, http.MethodPost, "/v2/connectors/repository-webhooks/"+url.PathEscape(connectorID)+"/events", nil, rawBody, headers, "", &out)
|
||||||
|
|||||||
@@ -31,6 +31,13 @@ func TestClientCallsProductionV2Endpoints(t *testing.T) {
|
|||||||
if r.Header.Get("Idempotency-Key") == "" {
|
if r.Header.Get("Idempotency-Key") == "" {
|
||||||
t.Fatalf("idempotency key missing")
|
t.Fatalf("idempotency key missing")
|
||||||
}
|
}
|
||||||
|
var payload map[string]any
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
|
||||||
|
t.Fatalf("decode event body: %v", err)
|
||||||
|
}
|
||||||
|
if payload["occurredAt"] == "" {
|
||||||
|
t.Fatalf("occurredAt missing from event body")
|
||||||
|
}
|
||||||
json.NewEncoder(w).Encode(EventReceipt{
|
json.NewEncoder(w).Encode(EventReceipt{
|
||||||
StreamID: "str_123", StreamEventID: "evt_123", SeqNo: 1, EventHash: strings.Repeat("a", 64), StreamHeadHash: strings.Repeat("b", 64),
|
StreamID: "str_123", StreamEventID: "evt_123", SeqNo: 1, EventHash: strings.Repeat("a", 64), StreamHeadHash: strings.Repeat("b", 64),
|
||||||
Receipt: SignedReceipt{Payload: M{}, ReceiptHash: strings.Repeat("c", 64), Signature: ReceiptSignature{Alg: "ed25519"}},
|
Receipt: SignedReceipt{Payload: M{}, ReceiptHash: strings.Repeat("c", 64), Signature: ReceiptSignature{Alg: "ed25519"}},
|
||||||
|
|||||||
@@ -260,6 +260,7 @@ func (a *app) events(ctx context.Context, args []string) error {
|
|||||||
streamID := fs.String("stream-id", "", "stream id")
|
streamID := fs.String("stream-id", "", "stream id")
|
||||||
sourceRef := fs.String("source-ref", "", "source reference")
|
sourceRef := fs.String("source-ref", "", "source reference")
|
||||||
eventType := fs.String("event-type", "inference", "event type")
|
eventType := fs.String("event-type", "inference", "event type")
|
||||||
|
occurredAt := fs.String("occurred-at", "", "source timestamp (RFC3339, defaults to current runtime time)")
|
||||||
payloadFile := fs.String("payload-file", "", "JSON payload file")
|
payloadFile := fs.String("payload-file", "", "JSON payload file")
|
||||||
metadataFile := fs.String("metadata-file", "", "JSON metadata file")
|
metadataFile := fs.String("metadata-file", "", "JSON metadata file")
|
||||||
if err := fs.Parse(args[1:]); err != nil {
|
if err := fs.Parse(args[1:]); err != nil {
|
||||||
@@ -273,7 +274,7 @@ func (a *app) events(ctx context.Context, args []string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
receipt, err := client.LogEvent(ctx, *streamID, attesto.EventInput{SourceRef: *sourceRef, EventType: *eventType, Payload: payload, Metadata: metadata})
|
receipt, err := client.LogEvent(ctx, *streamID, attesto.EventInput{SourceRef: *sourceRef, EventType: *eventType, OccurredAt: *occurredAt, Payload: payload, Metadata: metadata})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user