sdk(P1.6): client-side head tracking — your SDK is a fork detector
Completes the verification chain (P1.2 -> P1.1 -> P1.3 -> P1.6). The client remembers the last accepted (seq_no, event_hash) per stream and checks every new receipt links forward; if the server rewinds a sequence number or presents a divergent lineage, log_event / log_events raise AttestoForkDetected (Go: *ForkDetectedError) and the stored head is NOT advanced. The customer's own machine becomes the fork detector — no trust in any Attesto-side check. - Python: HeadStore protocol + FileHeadStore (~/.attesto/heads.json, atomic, 0600, default) + MemoryHeadStore; wired into sync and async v2 clients; head_store=None disables. - TypeScript: HeadStore + MemoryHeadStore (default, edge-safe); Node-only FileHeadStore kept in a separate module (@attesto/sdk/heads-file) so the core bundle imports no node:fs; headStore: null disables. - Go: HeadStore interface + MemoryHeadStore (default) + NewFileHeadStore; WithHeadStore option; WithHeadStore(nil) disables. Same forward/rewind/divergence/gap semantics across all three (unit-tested: in-order advance, forged-rewind fork, divergent-next fork, forward-gap accept, file-store restart persistence). Existing v2 client tests pin head_store=None (they replay overlapping seq). READMEs gain a "Your SDK is a witness" section. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
17
README.md
17
README.md
@@ -112,6 +112,23 @@ comp := attesto.VerifyCompleteness(events, 5, 8) // no event
|
|||||||
numbers must be gap-free and each event's `prev_event_hash` must chain to the
|
numbers must be gap-free and each event's `prev_event_hash` must chain to the
|
||||||
previous event's `event_hash`.
|
previous event's `event_hash`.
|
||||||
|
|
||||||
|
## Your SDK is a witness
|
||||||
|
|
||||||
|
The client remembers the last accepted `(seqNo, eventHash)` per stream and checks
|
||||||
|
every new receipt links forward. If the server ever rewinds a sequence number or
|
||||||
|
presents a divergent lineage, `LogEvent` / `LogEvents` return a
|
||||||
|
`*ForkDetectedError` and the stored head is not advanced. The default store is
|
||||||
|
in-memory; use a file store for fork detection across process invocations, or
|
||||||
|
disable it.
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Persist across CLI invocations (atomic, 0600 at ~/.attesto/heads.json):
|
||||||
|
client, _ := attesto.NewClient(apiKey, attesto.WithHeadStore(attesto.NewFileHeadStore("")))
|
||||||
|
|
||||||
|
// Disable fork detection:
|
||||||
|
client, _ = attesto.NewClient(apiKey, attesto.WithHeadStore(nil))
|
||||||
|
```
|
||||||
|
|
||||||
## Operator and Admin Endpoints
|
## Operator and Admin Endpoints
|
||||||
|
|
||||||
System-key clients are created with `attesto.NewClient`. Tenant/operator
|
System-key clients are created with `attesto.NewClient`. Tenant/operator
|
||||||
|
|||||||
42
client.go
42
client.go
@@ -23,6 +23,7 @@ type Client struct {
|
|||||||
maxRetries int
|
maxRetries int
|
||||||
userAgent string
|
userAgent string
|
||||||
validateKey bool
|
validateKey bool
|
||||||
|
headStore HeadStore
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option func(*Client) error
|
type Option func(*Client) error
|
||||||
@@ -51,6 +52,7 @@ func newClient(bearer string, validateKey bool, opts ...Option) (*Client, error)
|
|||||||
maxRetries: 3,
|
maxRetries: 3,
|
||||||
userAgent: "attesto-go/" + SDKVersion,
|
userAgent: "attesto-go/" + SDKVersion,
|
||||||
validateKey: validateKey,
|
validateKey: validateKey,
|
||||||
|
headStore: NewMemoryHeadStore(),
|
||||||
}
|
}
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
if err := opt(client); err != nil {
|
if err := opt(client); err != nil {
|
||||||
@@ -112,6 +114,16 @@ func WithUserAgent(userAgent string) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithHeadStore sets the store used for client-side fork detection. The default
|
||||||
|
// is an in-memory store; pass NewFileHeadStore(path) to persist across process
|
||||||
|
// invocations, or nil to disable fork detection.
|
||||||
|
func WithHeadStore(store HeadStore) Option {
|
||||||
|
return func(c *Client) error {
|
||||||
|
c.headStore = store
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) CreateStream(ctx context.Context, input StreamCreateInput, options ...RequestOptions) (*Stream, error) {
|
func (c *Client) CreateStream(ctx context.Context, input StreamCreateInput, options ...RequestOptions) (*Stream, error) {
|
||||||
var out Stream
|
var out Stream
|
||||||
if input.Metadata == nil {
|
if input.Metadata == nil {
|
||||||
@@ -172,8 +184,23 @@ func (c *Client) LogEvent(ctx context.Context, streamID string, input EventInput
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
var out EventReceipt
|
var out EventReceipt
|
||||||
err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events", nil, input, idempotency(options), &out)
|
if err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events", nil, input, idempotency(options), &out); err != nil {
|
||||||
return &out, err
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := c.trackHead(out); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// trackHead checks the receipt extends the stored head (returning a
|
||||||
|
// *ForkDetectedError on a rewind/divergence) and advances the store. No-op when
|
||||||
|
// fork detection is disabled (headStore is nil).
|
||||||
|
func (c *Client) trackHead(receipt EventReceipt) error {
|
||||||
|
if c.headStore == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return checkAndAdvanceHead(c.headStore, receipt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) LogEvents(ctx context.Context, streamID string, events []EventInput, options ...RequestOptions) (*EventBatchResponse, error) {
|
func (c *Client) LogEvents(ctx context.Context, streamID string, events []EventInput, options ...RequestOptions) (*EventBatchResponse, error) {
|
||||||
@@ -196,8 +223,15 @@ func (c *Client) LogEvents(ctx context.Context, streamID string, events []EventI
|
|||||||
}
|
}
|
||||||
body := M{"events": events}
|
body := M{"events": events}
|
||||||
var out EventBatchResponse
|
var out EventBatchResponse
|
||||||
err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events/batch", nil, body, idempotency(options), &out)
|
if err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events/batch", nil, body, idempotency(options), &out); err != nil {
|
||||||
return &out, err
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, receipt := range out.Receipts {
|
||||||
|
if err := c.trackHead(receipt); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) GetReceipt(ctx context.Context, streamEventID string) (*EventReceipt, error) {
|
func (c *Client) GetReceipt(ctx context.Context, streamEventID string) (*EventReceipt, error) {
|
||||||
|
|||||||
160
heads.go
Normal file
160
heads.go
Normal file
@@ -0,0 +1,160 @@
|
|||||||
|
package attesto
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HeadStore persists the last accepted (seqNo, eventHash) head per stream so the
|
||||||
|
// client can detect a rewound or divergent server history.
|
||||||
|
type HeadStore interface {
|
||||||
|
Get(streamID string) (seqNo int64, eventHash string, ok bool)
|
||||||
|
Set(streamID string, seqNo int64, eventHash string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ForkDetectedError reports that a receipt did not extend the last accepted head
|
||||||
|
// for its stream. The store is NOT advanced when this is returned.
|
||||||
|
type ForkDetectedError struct {
|
||||||
|
StreamID string
|
||||||
|
ExpectedSeq int64
|
||||||
|
ExpectedHash string
|
||||||
|
GotSeq int64
|
||||||
|
GotPrevHash string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ForkDetectedError) Error() string {
|
||||||
|
return fmt.Sprintf(
|
||||||
|
"fork detected on stream %s: receipt (seq=%d prev=%s) does not extend "+
|
||||||
|
"last accepted head (seq=%d hash=%s)",
|
||||||
|
e.StreamID, e.GotSeq, e.GotPrevHash, e.ExpectedSeq, e.ExpectedHash)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MemoryHeadStore keeps heads in process memory; safe for concurrent use.
|
||||||
|
type MemoryHeadStore struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
heads map[string][2]any
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMemoryHeadStore() *MemoryHeadStore {
|
||||||
|
return &MemoryHeadStore{heads: map[string][2]any{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MemoryHeadStore) Get(streamID string) (int64, string, bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
entry, ok := s.heads[streamID]
|
||||||
|
if !ok {
|
||||||
|
return 0, "", false
|
||||||
|
}
|
||||||
|
return entry[0].(int64), entry[1].(string), true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MemoryHeadStore) Set(streamID string, seqNo int64, eventHash string) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.heads[streamID] = [2]any{seqNo, eventHash}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FileHeadStore persists heads to a JSON file (default ~/.attesto/heads.json),
|
||||||
|
// giving fork detection across separate process invocations. Writes are atomic.
|
||||||
|
type FileHeadStore struct {
|
||||||
|
path string
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFileHeadStore uses the given path, or ~/.attesto/heads.json when empty.
|
||||||
|
func NewFileHeadStore(path string) *FileHeadStore {
|
||||||
|
if path == "" {
|
||||||
|
if home, err := os.UserHomeDir(); err == nil {
|
||||||
|
path = filepath.Join(home, ".attesto", "heads.json")
|
||||||
|
} else {
|
||||||
|
path = ".attesto-heads.json"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &FileHeadStore{path: path}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *FileHeadStore) load() map[string][2]json.RawMessage {
|
||||||
|
raw, err := os.ReadFile(s.path)
|
||||||
|
if err != nil {
|
||||||
|
return map[string][2]json.RawMessage{}
|
||||||
|
}
|
||||||
|
out := map[string][2]json.RawMessage{}
|
||||||
|
if err := json.Unmarshal(raw, &out); err != nil {
|
||||||
|
return map[string][2]json.RawMessage{}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *FileHeadStore) Get(streamID string) (int64, string, bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
entry, ok := s.load()[streamID]
|
||||||
|
if !ok {
|
||||||
|
return 0, "", false
|
||||||
|
}
|
||||||
|
var seqNo int64
|
||||||
|
var eventHash string
|
||||||
|
if json.Unmarshal(entry[0], &seqNo) != nil || json.Unmarshal(entry[1], &eventHash) != nil {
|
||||||
|
return 0, "", false
|
||||||
|
}
|
||||||
|
return seqNo, eventHash, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *FileHeadStore) Set(streamID string, seqNo int64, eventHash string) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
heads := map[string][2]any{}
|
||||||
|
for key, entry := range s.load() {
|
||||||
|
var n int64
|
||||||
|
var h string
|
||||||
|
_ = json.Unmarshal(entry[0], &n)
|
||||||
|
_ = json.Unmarshal(entry[1], &h)
|
||||||
|
heads[key] = [2]any{n, h}
|
||||||
|
}
|
||||||
|
heads[streamID] = [2]any{seqNo, eventHash}
|
||||||
|
body, err := json.Marshal(heads)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(filepath.Dir(s.path), 0o700); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tmp, err := os.CreateTemp(filepath.Dir(s.path), ".heads-")
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tmpName := tmp.Name()
|
||||||
|
_, writeErr := tmp.Write(body)
|
||||||
|
closeErr := tmp.Close()
|
||||||
|
if writeErr != nil || closeErr != nil {
|
||||||
|
_ = os.Remove(tmpName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = os.Chmod(tmpName, 0o600)
|
||||||
|
_ = os.Rename(tmpName, s.path)
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkAndAdvanceHead verifies a receipt extends the stored head, then advances
|
||||||
|
// the store. Returns *ForkDetectedError (without advancing) when the receipt
|
||||||
|
// rewinds or collides on seqNo, or claims to be the immediate next event but
|
||||||
|
// does not chain. A forward gap is accepted and advances.
|
||||||
|
func checkAndAdvanceHead(store HeadStore, receipt EventReceipt) error {
|
||||||
|
if storedSeq, storedHash, ok := store.Get(receipt.StreamID); ok {
|
||||||
|
if receipt.SeqNo <= storedSeq ||
|
||||||
|
(receipt.SeqNo == storedSeq+1 && receipt.PrevEventHash != storedHash) {
|
||||||
|
return &ForkDetectedError{
|
||||||
|
StreamID: receipt.StreamID,
|
||||||
|
ExpectedSeq: storedSeq,
|
||||||
|
ExpectedHash: storedHash,
|
||||||
|
GotSeq: receipt.SeqNo,
|
||||||
|
GotPrevHash: receipt.PrevEventHash,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
store.Set(receipt.StreamID, receipt.SeqNo, receipt.EventHash)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
82
heads_test.go
Normal file
82
heads_test.go
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
package attesto
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func receipt(seqNo int64, eventHash, prevEventHash string) EventReceipt {
|
||||||
|
return EventReceipt{
|
||||||
|
StreamID: "str_demo",
|
||||||
|
SeqNo: seqNo,
|
||||||
|
EventHash: eventHash,
|
||||||
|
PrevEventHash: prevEventHash,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryHeadStoreInOrderAdvances(t *testing.T) {
|
||||||
|
store := NewMemoryHeadStore()
|
||||||
|
for _, r := range []EventReceipt{receipt(1, "h1", ""), receipt(2, "h2", "h1"), receipt(3, "h3", "h2")} {
|
||||||
|
if err := checkAndAdvanceHead(store, r); err != nil {
|
||||||
|
t.Fatalf("unexpected: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if seq, hash, ok := store.Get("str_demo"); !ok || seq != 3 || hash != "h3" {
|
||||||
|
t.Errorf("head = (%d,%s,%v)", seq, hash, ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForgedRewoundReceiptIsFork(t *testing.T) {
|
||||||
|
store := NewMemoryHeadStore()
|
||||||
|
_ = checkAndAdvanceHead(store, receipt(1, "h1", ""))
|
||||||
|
_ = checkAndAdvanceHead(store, receipt(2, "h2", "h1"))
|
||||||
|
err := checkAndAdvanceHead(store, receipt(2, "h2-fork", "h1"))
|
||||||
|
var fork *ForkDetectedError
|
||||||
|
if !errors.As(err, &fork) {
|
||||||
|
t.Fatalf("expected *ForkDetectedError, got %v", err)
|
||||||
|
}
|
||||||
|
if seq, hash, _ := store.Get("str_demo"); seq != 2 || hash != "h2" {
|
||||||
|
t.Errorf("store advanced past fork: (%d,%s)", seq, hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDivergentNextEventIsFork(t *testing.T) {
|
||||||
|
store := NewMemoryHeadStore()
|
||||||
|
_ = checkAndAdvanceHead(store, receipt(1, "h1", ""))
|
||||||
|
err := checkAndAdvanceHead(store, receipt(2, "h2", "WRONG"))
|
||||||
|
var fork *ForkDetectedError
|
||||||
|
if !errors.As(err, &fork) {
|
||||||
|
t.Fatalf("expected *ForkDetectedError, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForwardGapAccepted(t *testing.T) {
|
||||||
|
store := NewMemoryHeadStore()
|
||||||
|
_ = checkAndAdvanceHead(store, receipt(1, "h1", ""))
|
||||||
|
if err := checkAndAdvanceHead(store, receipt(5, "h5", "h4")); err != nil {
|
||||||
|
t.Fatalf("forward gap should be accepted: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFileHeadStorePersistsAndIs0600(t *testing.T) {
|
||||||
|
path := filepath.Join(t.TempDir(), "heads.json")
|
||||||
|
store := NewFileHeadStore(path)
|
||||||
|
_ = checkAndAdvanceHead(store, receipt(1, "h1", ""))
|
||||||
|
_ = checkAndAdvanceHead(store, receipt(2, "h2", "h1"))
|
||||||
|
info, err := os.Stat(path)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if info.Mode().Perm() != 0o600 {
|
||||||
|
t.Errorf("mode = %o, want 600", info.Mode().Perm())
|
||||||
|
}
|
||||||
|
reopened := NewFileHeadStore(path)
|
||||||
|
if seq, hash, ok := reopened.Get("str_demo"); !ok || seq != 2 || hash != "h2" {
|
||||||
|
t.Errorf("reopened head = (%d,%s,%v)", seq, hash, ok)
|
||||||
|
}
|
||||||
|
if err := checkAndAdvanceHead(reopened, receipt(2, "h2-fork", "h1")); err == nil {
|
||||||
|
t.Error("expected fork on reopened store")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user