forked from Silverfish/proton-bridge
feat(GODT-2828): Increase sync progress report frequency
We now report sync progress after a batch completes each stage.
This commit is contained in:
@ -29,6 +29,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const DefaultRetryCoolDown = 20 * time.Second
|
const DefaultRetryCoolDown = 20 * time.Second
|
||||||
|
const NumSyncStages = 4
|
||||||
|
|
||||||
type LabelMap = map[string]proton.Label
|
type LabelMap = map[string]proton.Label
|
||||||
|
|
||||||
@ -187,7 +188,7 @@ func (t *Handler) run(ctx context.Context,
|
|||||||
syncStatus.TotalMessageCount = totalMessageCount
|
syncStatus.TotalMessageCount = totalMessageCount
|
||||||
}
|
}
|
||||||
|
|
||||||
syncReporter.InitializeProgressCounter(ctx, syncStatus.NumSyncedMessages, syncStatus.TotalMessageCount)
|
syncReporter.InitializeProgressCounter(ctx, syncStatus.NumSyncedMessages, syncStatus.TotalMessageCount*NumSyncStages)
|
||||||
|
|
||||||
if !syncStatus.HasMessages {
|
if !syncStatus.HasMessages {
|
||||||
t.log.Info("Syncing messages")
|
t.log.Info("Syncing messages")
|
||||||
|
|||||||
@ -57,7 +57,7 @@ func TestTask_NoStateAndSucceeds(t *testing.T) {
|
|||||||
})
|
})
|
||||||
call2 := tt.syncState.EXPECT().SetHasLabels(gomock.Any(), gomock.Eq(true)).After(call1).Times(1).Return(nil)
|
call2 := tt.syncState.EXPECT().SetHasLabels(gomock.Any(), gomock.Eq(true)).After(call1).Times(1).Return(nil)
|
||||||
call3 := tt.syncState.EXPECT().SetMessageCount(gomock.Any(), gomock.Eq(MessageTotal)).After(call2).Times(1).Return(nil)
|
call3 := tt.syncState.EXPECT().SetMessageCount(gomock.Any(), gomock.Eq(MessageTotal)).After(call2).Times(1).Return(nil)
|
||||||
tt.syncReporter.EXPECT().InitializeProgressCounter(gomock.Any(), gomock.Any(), gomock.Eq(MessageTotal))
|
tt.syncReporter.EXPECT().InitializeProgressCounter(gomock.Any(), gomock.Any(), gomock.Eq(MessageTotal*NumSyncStages))
|
||||||
call4 := tt.syncState.EXPECT().SetLastMessageID(gomock.Any(), gomock.Eq(MessageID), gomock.Eq(MessageDelta)).After(call3).Times(1).Return(nil)
|
call4 := tt.syncState.EXPECT().SetLastMessageID(gomock.Any(), gomock.Eq(MessageID), gomock.Eq(MessageDelta)).After(call3).Times(1).Return(nil)
|
||||||
call5 := tt.syncState.EXPECT().SetHasMessages(gomock.Any(), gomock.Eq(true)).After(call4).Times(1).Return(nil)
|
call5 := tt.syncState.EXPECT().SetHasMessages(gomock.Any(), gomock.Eq(true)).After(call4).Times(1).Return(nil)
|
||||||
tt.syncState.EXPECT().GetSyncStatus(gomock.Any()).After(call5).Times(1).DoAndReturn(func(_ context.Context) (Status, error) {
|
tt.syncState.EXPECT().GetSyncStatus(gomock.Any()).After(call5).Times(1).DoAndReturn(func(_ context.Context) (Status, error) {
|
||||||
@ -126,7 +126,7 @@ func TestTask_StateHasLabels(t *testing.T) {
|
|||||||
call3 := tt.syncState.EXPECT().SetMessageCount(gomock.Any(), gomock.Eq(MessageTotal)).After(call2).Times(1).Return(nil)
|
call3 := tt.syncState.EXPECT().SetMessageCount(gomock.Any(), gomock.Eq(MessageTotal)).After(call2).Times(1).Return(nil)
|
||||||
call4 := tt.syncState.EXPECT().SetLastMessageID(gomock.Any(), gomock.Eq(MessageID), gomock.Eq(MessageDelta)).After(call3).Times(1).Return(nil)
|
call4 := tt.syncState.EXPECT().SetLastMessageID(gomock.Any(), gomock.Eq(MessageID), gomock.Eq(MessageDelta)).After(call3).Times(1).Return(nil)
|
||||||
tt.syncState.EXPECT().SetHasMessages(gomock.Any(), gomock.Eq(true)).After(call4).Times(1).Return(nil)
|
tt.syncState.EXPECT().SetHasMessages(gomock.Any(), gomock.Eq(true)).After(call4).Times(1).Return(nil)
|
||||||
tt.syncReporter.EXPECT().InitializeProgressCounter(gomock.Any(), gomock.Any(), gomock.Eq(MessageTotal))
|
tt.syncReporter.EXPECT().InitializeProgressCounter(gomock.Any(), gomock.Any(), gomock.Eq(MessageTotal*NumSyncStages))
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -172,7 +172,7 @@ func TestTask_StateHasLabelsAndMessageCount(t *testing.T) {
|
|||||||
})
|
})
|
||||||
call4 := tt.syncState.EXPECT().SetLastMessageID(gomock.Any(), gomock.Eq(MessageID), gomock.Eq(MessageDelta)).After(call3).Times(1).Return(nil)
|
call4 := tt.syncState.EXPECT().SetLastMessageID(gomock.Any(), gomock.Eq(MessageID), gomock.Eq(MessageDelta)).After(call3).Times(1).Return(nil)
|
||||||
tt.syncState.EXPECT().SetHasMessages(gomock.Any(), gomock.Eq(true)).After(call4).Times(1).Return(nil)
|
tt.syncState.EXPECT().SetHasMessages(gomock.Any(), gomock.Eq(true)).After(call4).Times(1).Return(nil)
|
||||||
tt.syncReporter.EXPECT().InitializeProgressCounter(gomock.Any(), gomock.Any(), gomock.Eq(MessageTotal))
|
tt.syncReporter.EXPECT().InitializeProgressCounter(gomock.Any(), gomock.Any(), gomock.Eq(MessageTotal*NumSyncStages))
|
||||||
}
|
}
|
||||||
|
|
||||||
tt.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Eq(MessageDelta))
|
tt.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Eq(MessageDelta))
|
||||||
@ -222,7 +222,7 @@ func TestTask_RepeatsOnSyncFailure(t *testing.T) {
|
|||||||
|
|
||||||
tt.addMessageSyncCompletedExpectation(MessageID, MessageDelta)
|
tt.addMessageSyncCompletedExpectation(MessageID, MessageDelta)
|
||||||
|
|
||||||
tt.syncReporter.EXPECT().InitializeProgressCounter(gomock.Any(), gomock.Any(), gomock.Eq(MessageTotal))
|
tt.syncReporter.EXPECT().InitializeProgressCounter(gomock.Any(), gomock.Any(), gomock.Eq(MessageTotal*NumSyncStages))
|
||||||
|
|
||||||
{
|
{
|
||||||
call0 := tt.syncState.EXPECT().GetSyncStatus(gomock.Any()).DoAndReturn(func(_ context.Context) (Status, error) {
|
call0 := tt.syncState.EXPECT().GetSyncStatus(gomock.Any()).DoAndReturn(func(_ context.Context) (Status, error) {
|
||||||
|
|||||||
@ -108,6 +108,10 @@ func (j *Job) onError(err error) {
|
|||||||
j.cancel()
|
j.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (j *Job) onStageCompleted(ctx context.Context, count int64) {
|
||||||
|
j.syncReporter.OnProgress(ctx, count)
|
||||||
|
}
|
||||||
|
|
||||||
func (j *Job) onJobFinished(ctx context.Context, lastMessageID string, count int64) {
|
func (j *Job) onJobFinished(ctx context.Context, lastMessageID string, count int64) {
|
||||||
defer j.wg.Done()
|
defer j.wg.Done()
|
||||||
|
|
||||||
@ -219,6 +223,10 @@ func (s *childJob) onFinished(ctx context.Context) {
|
|||||||
s.job.downloadCache.DeleteAttachments(s.cachedAttachmentIDs...)
|
s.job.downloadCache.DeleteAttachments(s.cachedAttachmentIDs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *childJob) onStageCompleted(ctx context.Context) {
|
||||||
|
s.job.onStageCompleted(ctx, s.messageCount)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *childJob) checkCancelled() bool {
|
func (s *childJob) checkCancelled() bool {
|
||||||
err := s.job.ctx.Err()
|
err := s.job.ctx.Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -178,8 +178,12 @@ func (b *BuildStage) run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
outJob := chunkedJobs[idx]
|
||||||
|
|
||||||
|
outJob.onStageCompleted(ctx)
|
||||||
|
|
||||||
b.output.Produce(ctx, ApplyRequest{
|
b.output.Produce(ctx, ApplyRequest{
|
||||||
childJob: chunkedJobs[idx],
|
childJob: outJob,
|
||||||
messages: success,
|
messages: success,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -90,6 +90,8 @@ func TestBuildStage_SuccessRemovesFailedMessage(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Eq(int64(10)))
|
||||||
|
|
||||||
tj.job.begin()
|
tj.job.begin()
|
||||||
childJob := tj.job.newChildJob("f", 10)
|
childJob := tj.job.newChildJob("f", 10)
|
||||||
tj.job.end()
|
tj.job.end()
|
||||||
@ -160,6 +162,8 @@ func TestBuildStage_BuildFailureIsReportedButDoesNotCancelJob(t *testing.T) {
|
|||||||
"error": buildError,
|
"error": buildError,
|
||||||
})).Return(nil)
|
})).Return(nil)
|
||||||
|
|
||||||
|
tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Eq(int64(10)))
|
||||||
|
|
||||||
stage := NewBuildStage(input, output, 1024, &async.NoopPanicHandler{}, mockReporter)
|
stage := NewBuildStage(input, output, 1024, &async.NoopPanicHandler{}, mockReporter)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -210,6 +214,8 @@ func TestBuildStage_FailedToLocateKeyRingIsReportedButDoesNotFailBuild(t *testin
|
|||||||
"messageID": "MSG",
|
"messageID": "MSG",
|
||||||
})).Return(nil)
|
})).Return(nil)
|
||||||
|
|
||||||
|
tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Eq(int64(10)))
|
||||||
|
|
||||||
stage := NewBuildStage(input, output, 1024, &async.NoopPanicHandler{}, mockReporter)
|
stage := NewBuildStage(input, output, 1024, &async.NoopPanicHandler{}, mockReporter)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
|||||||
@ -181,6 +181,8 @@ func (d *DownloadStage) run(ctx context.Context) {
|
|||||||
request.cachedMessageIDs = request.ids
|
request.cachedMessageIDs = request.ids
|
||||||
|
|
||||||
// Step 5: Publish result.
|
// Step 5: Publish result.
|
||||||
|
request.onStageCompleted(ctx)
|
||||||
|
|
||||||
d.output.Produce(ctx, BuildRequest{
|
d.output.Produce(ctx, BuildRequest{
|
||||||
batch: result,
|
batch: result,
|
||||||
childJob: request.childJob,
|
childJob: request.childJob,
|
||||||
|
|||||||
@ -175,6 +175,8 @@ func TestDownloadStage_Run(t *testing.T) {
|
|||||||
tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Any())
|
tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Any())
|
||||||
tj.state.EXPECT().SetLastMessageID(gomock.Any(), gomock.Eq("f"), gomock.Eq(int64(10))).Return(nil)
|
tj.state.EXPECT().SetLastMessageID(gomock.Any(), gomock.Eq("f"), gomock.Eq(int64(10))).Return(nil)
|
||||||
|
|
||||||
|
tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Eq(int64(10)))
|
||||||
|
|
||||||
tj.job.begin()
|
tj.job.begin()
|
||||||
defer tj.job.end()
|
defer tj.job.end()
|
||||||
childJob := tj.job.newChildJob("f", 10)
|
childJob := tj.job.newChildJob("f", 10)
|
||||||
@ -216,6 +218,8 @@ func TestDownloadStage_RunWith422(t *testing.T) {
|
|||||||
tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Any())
|
tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Any())
|
||||||
tj.state.EXPECT().SetLastMessageID(gomock.Any(), gomock.Eq("f"), gomock.Eq(int64(10))).Return(nil)
|
tj.state.EXPECT().SetLastMessageID(gomock.Any(), gomock.Eq("f"), gomock.Eq(int64(10))).Return(nil)
|
||||||
|
|
||||||
|
tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Eq(int64(10)))
|
||||||
|
|
||||||
tj.job.begin()
|
tj.job.begin()
|
||||||
defer tj.job.end()
|
defer tj.job.end()
|
||||||
childJob := tj.job.newChildJob("f", 10)
|
childJob := tj.job.newChildJob("f", 10)
|
||||||
|
|||||||
@ -113,6 +113,8 @@ func (m *MetadataStage) run(ctx context.Context, metadataPageSize int, maxMessag
|
|||||||
state.stage.metadataFetched += int64(len(output.ids))
|
state.stage.metadataFetched += int64(len(output.ids))
|
||||||
job.log.Debugf("Metada collected: %v/%v", state.stage.metadataFetched, state.stage.totalMessageCount)
|
job.log.Debugf("Metada collected: %v/%v", state.stage.metadataFetched, state.stage.totalMessageCount)
|
||||||
|
|
||||||
|
output.onStageCompleted(ctx)
|
||||||
|
|
||||||
m.output.Produce(ctx, output)
|
m.output.Produce(ctx, output)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -62,6 +62,7 @@ func TestMetadataStage_RunFinishesWith429(t *testing.T) {
|
|||||||
input.Produce(ctx, tj.job)
|
input.Produce(ctx, tj.job)
|
||||||
|
|
||||||
for _, chunk := range xslices.Chunk(msgs, TestMaxMessages) {
|
for _, chunk := range xslices.Chunk(msgs, TestMaxMessages) {
|
||||||
|
tj.syncReporter.EXPECT().OnProgress(gomock.Any(), gomock.Eq(int64(len(chunk))))
|
||||||
req, err := output.Consume(ctx)
|
req, err := output.Consume(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, req.ids, xslices.Map(chunk, func(m proton.MessageMetadata) string {
|
require.Equal(t, req.ids, xslices.Map(chunk, func(m proton.MessageMetadata) string {
|
||||||
|
|||||||
Reference in New Issue
Block a user