Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions example/submitqueue/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,16 +775,17 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch
// "origin", PUSHER_TARGET default "main"). When PUSHER_CHECKOUT_PATH is unset it
// returns the fake pusher (commits succeed unless a change URI carries a failure
// marker, see pusher/fake), keeping the example runnable without a git checkout.
func newPusher(logger *zap.Logger, scope tally.Scope) (pusher.Pusher, error) {
func newPusher(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolver) (pusher.Pusher, error) {
checkout := os.Getenv("PUSHER_CHECKOUT_PATH")
if checkout == "" {
logger.Warn("PUSHER_CHECKOUT_PATH not set; using fake pusher (commits succeed unless URI-marked)")
return pushfake.New(), nil
return pushfake.New(resolver), nil
}
return gitpusher.NewPusher(gitpusher.Params{
CheckoutPath: checkout,
Remote: getEnv("PUSHER_REMOTE", "origin"),
Target: getEnv("PUSHER_TARGET", "main"),
Resolver: resolver,
Logger: logger.Sugar(),
MetricsScope: scope.SubScope("pusher"),
}), nil
Expand All @@ -806,7 +807,7 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset.
if err != nil {
return queueRegistry{}, fmt.Errorf("failed to create change provider: %w", err)
}
psh, err := newPusher(logger, scope)
psh, err := newPusher(logger, scope, resolver)
if err != nil {
return queueRegistry{}, fmt.Errorf("failed to create pusher: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions submitqueue/extension/pusher/fake/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/uber/submitqueue/submitqueue/extension/pusher/fake",
visibility = ["//visibility:public"],
deps = [
"//submitqueue/core/changeset",
"//submitqueue/core/fakemarker",
"//submitqueue/entity",
"//submitqueue/extension/pusher",
Expand All @@ -17,6 +18,7 @@ go_test(
srcs = ["fake_test.go"],
embed = [":fake"],
deps = [
"//submitqueue/core/changeset/fake",
"//submitqueue/entity",
"//submitqueue/extension/pusher",
"@com_github_stretchr_testify//assert",
Expand Down
56 changes: 38 additions & 18 deletions submitqueue/extension/pusher/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"fmt"
"sync/atomic"

"github.com/uber/submitqueue/submitqueue/core/changeset"
"github.com/uber/submitqueue/submitqueue/core/fakemarker"
"github.com/uber/submitqueue/submitqueue/entity"
"github.com/uber/submitqueue/submitqueue/extension/pusher"
Expand All @@ -45,35 +46,54 @@ const (

// fakePusher is a pusher.Pusher that reports every change as committed unless a
// marker token in a change URI requests a failure. The atomic counter hands out
// unique synthetic commit SHAs and makes the type safe for concurrent use.
// unique synthetic commit SHAs and makes the type safe for concurrent use. It
// resolves each batch's changes through the injected resolver.
type fakePusher struct {
counter atomic.Uint64
resolver changeset.Resolver
counter atomic.Uint64
}

// New returns a pusher.Pusher that defaults to committing every change and
// honors marker tokens embedded in change URIs.
func New() pusher.Pusher {
return &fakePusher{}
// honors marker tokens embedded in change URIs. The resolver resolves each
// batch's changes.
func New(resolver changeset.Resolver) pusher.Pusher {
return &fakePusher{resolver: resolver}
}

// Push reports every change as committed with a synthetic commit SHA, unless a
// recognized marker token in one of the changes requests a failure.
func (p *fakePusher) Push(_ context.Context, changes []entity.Change) (pusher.Result, error) {
switch fakemarker.TokenInChanges(changes) {
// Push resolves each batch's changes and reports every change as committed with
// a synthetic commit SHA, grouped per batch, unless a recognized marker token in
// one of the changes requests a failure.
func (p *fakePusher) Push(ctx context.Context, batches []entity.Batch) (pusher.Result, error) {
perBatch := make([][]entity.Change, len(batches))
var all []entity.Change
for i, b := range batches {
cs, err := p.resolver.ChangesForBatch(ctx, b)
if err != nil {
return pusher.Result{}, fmt.Errorf("fake: resolve batch %s: %w", b.ID, err)
}
perBatch[i] = cs
all = append(all, cs...)
}

switch fakemarker.TokenInChanges(all) {
case tokenConflict:
return pusher.Result{}, pusher.ErrConflict
case tokenError:
return pusher.Result{}, fmt.Errorf("fake: marked push error")
}

outcomes := make([]pusher.ChangeOutcome, 0, len(changes))
for _, change := range changes {
sha := fmt.Sprintf("fake-%d", p.counter.Add(1))
outcomes = append(outcomes, pusher.ChangeOutcome{
Change: change,
Status: pusher.OutcomeStatusCommitted,
CommitSHAs: []string{sha},
})
result := make([]pusher.BatchOutcome, len(batches))
for i, b := range batches {
outcomes := make([]pusher.ChangeOutcome, 0, len(perBatch[i]))
for _, change := range perBatch[i] {
sha := fmt.Sprintf("fake-%d", p.counter.Add(1))
outcomes = append(outcomes, pusher.ChangeOutcome{
Change: change,
Status: pusher.OutcomeStatusCommitted,
CommitSHAs: []string{sha},
})
}
result[i] = pusher.BatchOutcome{BatchID: b.ID, Outcomes: outcomes}
}
return pusher.Result{Outcomes: outcomes}, nil
return pusher.Result{Batches: result}, nil
}
26 changes: 12 additions & 14 deletions submitqueue/extension/pusher/fake/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,29 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
changesetfake "github.com/uber/submitqueue/submitqueue/core/changeset/fake"
"github.com/uber/submitqueue/submitqueue/entity"
"github.com/uber/submitqueue/submitqueue/extension/pusher"
)

func TestNew_ImplementsInterface(t *testing.T) {
var _ pusher.Pusher = New()
var _ pusher.Pusher = New(changesetfake.New())
}

func TestPusher_Push_Committed(t *testing.T) {
p := New()
changes := []entity.Change{
{URIs: []string{"github://owner/repo/pull/1/abc"}},
{URIs: []string{"github://owner/repo/pull/2/def"}},
}
p := New(changesetfake.New().Set("b", changes...))

res, err := p.Push(context.Background(), changes)
res, err := p.Push(context.Background(), []entity.Batch{{ID: "b"}})
require.NoError(t, err)
require.Len(t, res.Outcomes, len(changes))
require.Len(t, res.Batches, 1)
require.Len(t, res.Batches[0].Outcomes, len(changes))

seen := map[string]bool{}
for i, out := range res.Outcomes {
for i, out := range res.Batches[0].Outcomes {
assert.Equal(t, changes[i], out.Change)
assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status)
require.Len(t, out.CommitSHAs, 1)
Expand All @@ -51,19 +53,15 @@ func TestPusher_Push_Committed(t *testing.T) {
}

func TestPusher_Push_ConflictMarker(t *testing.T) {
p := New()
_, err := p.Push(context.Background(), []entity.Change{
{URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=conflict"}},
})
p := New(changesetfake.New().Set("b", entity.Change{URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=conflict"}}))
_, err := p.Push(context.Background(), []entity.Batch{{ID: "b"}})
assert.True(t, errors.Is(err, pusher.ErrConflict))
}

func TestPusher_Push_ErrorMarker(t *testing.T) {
p := New()
res, err := p.Push(context.Background(), []entity.Change{
{URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=push-error"}},
})
p := New(changesetfake.New().Set("b", entity.Change{URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=push-error"}}))
res, err := p.Push(context.Background(), []entity.Batch{{ID: "b"}})
require.Error(t, err)
// Atomicity: on error no outcomes are reported.
assert.Empty(t, res.Outcomes)
assert.Empty(t, res.Batches)
}
2 changes: 2 additions & 0 deletions submitqueue/extension/pusher/git/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
"//submitqueue/core/changeset",
"//submitqueue/entity",
"//submitqueue/entity/github",
"//submitqueue/extension/pusher",
Expand All @@ -20,6 +21,7 @@ go_test(
srcs = ["git_pusher_test.go"],
embed = [":git"],
deps = [
"//submitqueue/core/changeset/fake",
"//submitqueue/entity",
"//submitqueue/extension/pusher",
"@com_github_stretchr_testify//assert",
Expand Down
35 changes: 33 additions & 2 deletions submitqueue/extension/pusher/git/git_pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"go.uber.org/zap"

coremetrics "github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/submitqueue/core/changeset"
"github.com/uber/submitqueue/submitqueue/entity"
entitygithub "github.com/uber/submitqueue/submitqueue/entity/github"
"github.com/uber/submitqueue/submitqueue/extension/pusher"
Expand All @@ -82,6 +83,8 @@ type Params struct {
Remote string
// Target is the destination branch ref on the remote (e.g. "main").
Target string
// Resolver resolves each batch's changes.
Resolver changeset.Resolver
// Logger is the structured logger.
Logger *zap.SugaredLogger
// MetricsScope is the metrics scope for instrumentation.
Expand All @@ -98,6 +101,7 @@ type gitPusher struct {
checkoutPath string
remote string
target string
resolver changeset.Resolver
logger *zap.SugaredLogger
metricsScope tally.Scope
maxPushAttempts int
Expand All @@ -121,17 +125,31 @@ func NewPusher(params Params) pusher.Pusher {
checkoutPath: params.CheckoutPath,
remote: params.Remote,
target: params.Target,
resolver: params.Resolver,
logger: params.Logger.Named("git_pusher"),
metricsScope: params.MetricsScope.SubScope("git_pusher"),
maxPushAttempts: maxAttempts,
}
}

// Push fulfils the pusher.Pusher contract.
func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret pusher.Result, retErr error) {
func (p *gitPusher) Push(ctx context.Context, batches []entity.Batch) (ret pusher.Result, retErr error) {
op := coremetrics.Begin(p.metricsScope, "push")
defer func() { op.Complete(retErr) }()

// Resolve each batch's changes, keeping per-batch counts so the flat
// outcomes can be regrouped per batch on success.
perBatch := make([][]entity.Change, len(batches))
var changes []entity.Change
for i, b := range batches {
cs, err := p.resolver.ChangesForBatch(ctx, b)
if err != nil {
return pusher.Result{}, fmt.Errorf("resolve batch %s: %w", b.ID, err)
}
perBatch[i] = cs
changes = append(changes, cs...)
}

p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -154,7 +172,7 @@ func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret push
"target", p.target,
"outcomes", outcomes,
)
return pusher.Result{Outcomes: outcomes}, nil
return pusher.Result{Batches: groupByBatch(batches, perBatch, outcomes)}, nil
}

// Was the failure caused by the remote tip moving under us between
Expand Down Expand Up @@ -190,6 +208,19 @@ func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret push
return pusher.Result{}, fmt.Errorf("exceeded %d push attempts due to remote contention: %w", p.maxPushAttempts, lastErr)
}

// groupByBatch splits the flat, apply-ordered outcomes back into one
// BatchOutcome per input batch, using each batch's resolved change count.
func groupByBatch(batches []entity.Batch, perBatch [][]entity.Change, outcomes []pusher.ChangeOutcome) []pusher.BatchOutcome {
result := make([]pusher.BatchOutcome, len(batches))
pos := 0
for i, b := range batches {
n := len(perBatch[i])
result[i] = pusher.BatchOutcome{BatchID: b.ID, Outcomes: outcomes[pos : pos+n]}
pos += n
}
return result
}

// tryPush runs one full reset+cherry-pick+push cycle. The returned baseSHA
// is the SHA the cycle was based on (set as soon as resetToRemote completes)
// so the caller can distinguish concurrent-push contention from other push
Expand Down
Loading
Loading