diff --git a/README.md b/README.md index 814c9fe..1b38925 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,22 @@ client, _ := attesto.NewClient(apiKey, attesto.WithHeadStore(attesto.NewFileHead client, _ = attesto.NewClient(apiKey, attesto.WithHeadStore(nil)) ``` +## Iterating long listings + +Paginated `List*` methods have `Iter*` twins that walk limit/offset pages +transparently; `Next` returns `(nil, nil)` when the listing is exhausted: + +```go +it := client.IterTenantStreamEvents("str_...", 200) +for { + event, err := it.Next(ctx) + if err != nil || event == nil { + break + } + process(event) +} +``` + ## Verify anchors on-chain `VerifyAnchorOnchain` checks an anchor epoch against the chain itself — one raw diff --git a/iterator.go b/iterator.go new file mode 100644 index 0000000..c842390 --- /dev/null +++ b/iterator.go @@ -0,0 +1,80 @@ +package attesto + +import "context" + +// PageFunc fetches one limit/offset page of list results. +type PageFunc func(ctx context.Context, limit, offset int) ([]M, error) + +// Iterator walks limit/offset pages of a list endpoint transparently, stopping +// on the first short page. Same endpoints, no new API surface. +type Iterator struct { + fetch PageFunc + pageSize int + buffer []M + offset int + done bool +} + +// NewIterator wraps any limit/offset PageFunc. pageSize <= 0 defaults to 100. +func NewIterator(fetch PageFunc, pageSize int) *Iterator { + if pageSize <= 0 { + pageSize = 100 + } + return &Iterator{fetch: fetch, pageSize: pageSize} +} + +// Next returns the next item, or (nil, nil) when the listing is exhausted. +func (it *Iterator) Next(ctx context.Context) (M, error) { + if len(it.buffer) == 0 && !it.done { + page, err := it.fetch(ctx, it.pageSize, it.offset) + if err != nil { + return nil, err + } + it.offset += it.pageSize + if len(page) < it.pageSize { + it.done = true + } + it.buffer = page + } + if len(it.buffer) == 0 { + return nil, nil + } + item := it.buffer[0] + it.buffer = it.buffer[1:] + return item, nil +} + +// IterTenantStreamEvents returns an iterator over a stream's events. +func (c *Client) IterTenantStreamEvents(streamID string, pageSize int) *Iterator { + return NewIterator(func(ctx context.Context, limit, offset int) ([]M, error) { + return c.ListTenantStreamEvents(ctx, streamID, limit, offset) + }, pageSize) +} + +// IterTenantWindows returns an iterator over a stream's windows. +func (c *Client) IterTenantWindows(streamID string, pageSize int) *Iterator { + return NewIterator(func(ctx context.Context, limit, offset int) ([]M, error) { + return c.ListTenantWindows(ctx, streamID, limit, offset) + }, pageSize) +} + +// IterTenantCheckpoints returns an iterator over a stream's checkpoints. +func (c *Client) IterTenantCheckpoints(streamID string, pageSize int) *Iterator { + return NewIterator(func(ctx context.Context, limit, offset int) ([]M, error) { + return c.ListTenantCheckpoints(ctx, streamID, limit, offset) + }, pageSize) +} + +// IterForkEvidence returns an iterator over a stream's fork evidence. +func (c *Client) IterForkEvidence(streamID string, pageSize int) *Iterator { + return NewIterator(func(ctx context.Context, limit, offset int) ([]M, error) { + return c.ListForkEvidence(ctx, streamID, limit, offset) + }, pageSize) +} + +// IterTenantIVCEpochs returns an iterator over a stream's IVC epochs. +func (c *Client) IterTenantIVCEpochs(streamID string, pageSize int) *Iterator { + return NewIterator(func(ctx context.Context, limit, offset int) ([]M, error) { + return c.ListTenantIVCEpochs(ctx, streamID, limit, offset) + }, pageSize) +} diff --git a/iterator_test.go b/iterator_test.go new file mode 100644 index 0000000..4074be0 --- /dev/null +++ b/iterator_test.go @@ -0,0 +1,65 @@ +package attesto + +import ( + "context" + "testing" +) + +func TestIteratorDrainsThreePagesInOrder(t *testing.T) { + pages := map[int][]M{ + 0: {{"seqNo": 1}, {"seqNo": 2}, {"seqNo": 3}}, + 3: {{"seqNo": 4}, {"seqNo": 5}, {"seqNo": 6}}, + 6: {{"seqNo": 7}}, + } + calls := 0 + it := NewIterator(func(_ context.Context, limit, offset int) ([]M, error) { + calls++ + if limit != 3 { + t.Fatalf("limit = %d, want 3", limit) + } + return pages[offset], nil + }, 3) + + var seqs []int + for { + item, err := it.Next(context.Background()) + if err != nil { + t.Fatal(err) + } + if item == nil { + break + } + seqs = append(seqs, item["seqNo"].(int)) + } + want := []int{1, 2, 3, 4, 5, 6, 7} + if len(seqs) != len(want) { + t.Fatalf("seqs = %v", seqs) + } + for i, v := range want { + if seqs[i] != v { + t.Fatalf("seqs = %v, want %v", seqs, want) + } + } + if calls != 3 { + t.Errorf("calls = %d, want 3", calls) + } +} + +func TestIteratorStopsCleanlyOnShortFirstPage(t *testing.T) { + calls := 0 + it := NewIterator(func(_ context.Context, _, _ int) ([]M, error) { + calls++ + return []M{{"streamId": "str_only"}}, nil + }, 100) + first, err := it.Next(context.Background()) + if err != nil || first == nil { + t.Fatalf("first = %v err = %v", first, err) + } + second, err := it.Next(context.Background()) + if err != nil || second != nil { + t.Fatalf("second = %v err = %v", second, err) + } + if calls != 1 { + t.Errorf("calls = %d, want 1 (short page must end iteration)", calls) + } +}