Other(refactor): Use normal value + mutex for user.updateCh
This commit is contained in:
@ -113,18 +113,14 @@ func (user *User) handleCreateAddressEvent(ctx context.Context, event liteapi.Ad
|
||||
return fmt.Errorf("failed to get primary address: %w", err)
|
||||
}
|
||||
|
||||
user.updateCh.SetFrom(event.Address.ID, primAddr.ID)
|
||||
user.updateCh[event.Address.ID] = user.updateCh[primAddr.ID]
|
||||
|
||||
case vault.SplitMode:
|
||||
user.updateCh.Set(event.Address.ID, queue.NewQueuedChannel[imap.Update](0, 0))
|
||||
user.updateCh[event.Address.ID] = queue.NewQueuedChannel[imap.Update](0, 0)
|
||||
}
|
||||
|
||||
if user.vault.AddressMode() == vault.SplitMode {
|
||||
if ok, err := user.updateCh.GetErr(event.Address.ID, func(updateCh *queue.QueuedChannel[imap.Update]) error {
|
||||
return syncLabels(ctx, user.client, updateCh)
|
||||
}); !ok {
|
||||
return fmt.Errorf("no such address %q", event.Address.ID)
|
||||
} else if err != nil {
|
||||
if err := syncLabels(ctx, user.client, user.updateCh[event.Address.ID]); err != nil {
|
||||
return fmt.Errorf("failed to sync labels to new address: %w", err)
|
||||
}
|
||||
}
|
||||
@ -136,7 +132,7 @@ func (user *User) handleCreateAddressEvent(ctx context.Context, event liteapi.Ad
|
||||
})
|
||||
|
||||
return nil
|
||||
}, &user.apiAddrsLock)
|
||||
}, &user.apiAddrsLock, &user.updateChLock)
|
||||
}
|
||||
|
||||
func (user *User) handleUpdateAddressEvent(_ context.Context, event liteapi.AddressEvent) error { //nolint:unparam
|
||||
@ -154,7 +150,7 @@ func (user *User) handleUpdateAddressEvent(_ context.Context, event liteapi.Addr
|
||||
})
|
||||
|
||||
return nil
|
||||
})
|
||||
}, &user.apiAddrsLock)
|
||||
}
|
||||
|
||||
func (user *User) handleDeleteAddressEvent(_ context.Context, event liteapi.AddressEvent) error {
|
||||
@ -164,14 +160,13 @@ func (user *User) handleDeleteAddressEvent(_ context.Context, event liteapi.Addr
|
||||
return fmt.Errorf("address %q does not exist", event.ID)
|
||||
}
|
||||
|
||||
if ok := user.updateCh.GetDelete(event.ID, func(updateCh *queue.QueuedChannel[imap.Update]) {
|
||||
if user.vault.AddressMode() == vault.SplitMode {
|
||||
updateCh.CloseAndDiscardQueued()
|
||||
}
|
||||
}); !ok {
|
||||
return fmt.Errorf("no such address %q", event.ID)
|
||||
if user.vault.AddressMode() == vault.SplitMode {
|
||||
user.updateCh[event.ID].CloseAndDiscardQueued()
|
||||
delete(user.updateCh, event.ID)
|
||||
}
|
||||
|
||||
delete(user.apiAddrs, event.ID)
|
||||
|
||||
user.eventCh.Enqueue(events.UserAddressDeleted{
|
||||
UserID: user.ID(),
|
||||
AddressID: event.ID,
|
||||
@ -179,7 +174,7 @@ func (user *User) handleDeleteAddressEvent(_ context.Context, event liteapi.Addr
|
||||
})
|
||||
|
||||
return nil
|
||||
})
|
||||
}, &user.apiAddrsLock, &user.updateChLock)
|
||||
}
|
||||
|
||||
// handleLabelEvents handles the given label events.
|
||||
@ -214,9 +209,9 @@ func (user *User) handleCreateLabelEvent(_ context.Context, event liteapi.LabelE
|
||||
|
||||
user.apiLabels[event.Label.ID] = event.Label
|
||||
|
||||
user.updateCh.IterValues(func(updateCh *queue.QueuedChannel[imap.Update]) {
|
||||
for _, updateCh := range user.updateCh {
|
||||
updateCh.Enqueue(newMailboxCreatedUpdate(imap.MailboxID(event.ID), getMailboxName(event.Label)))
|
||||
})
|
||||
}
|
||||
|
||||
user.eventCh.Enqueue(events.UserLabelCreated{
|
||||
UserID: user.ID(),
|
||||
@ -225,7 +220,7 @@ func (user *User) handleCreateLabelEvent(_ context.Context, event liteapi.LabelE
|
||||
})
|
||||
|
||||
return nil
|
||||
}, &user.apiLabelsLock)
|
||||
}, &user.apiLabelsLock, &user.updateChLock)
|
||||
}
|
||||
|
||||
func (user *User) handleUpdateLabelEvent(_ context.Context, event liteapi.LabelEvent) error { //nolint:unparam
|
||||
@ -236,9 +231,9 @@ func (user *User) handleUpdateLabelEvent(_ context.Context, event liteapi.LabelE
|
||||
|
||||
user.apiLabels[event.Label.ID] = event.Label
|
||||
|
||||
user.updateCh.IterValues(func(updateCh *queue.QueuedChannel[imap.Update]) {
|
||||
for _, updateCh := range user.updateCh {
|
||||
updateCh.Enqueue(imap.NewMailboxUpdated(imap.MailboxID(event.ID), getMailboxName(event.Label)))
|
||||
})
|
||||
}
|
||||
|
||||
user.eventCh.Enqueue(events.UserLabelUpdated{
|
||||
UserID: user.ID(),
|
||||
@ -247,7 +242,7 @@ func (user *User) handleUpdateLabelEvent(_ context.Context, event liteapi.LabelE
|
||||
})
|
||||
|
||||
return nil
|
||||
}, &user.apiLabelsLock)
|
||||
}, &user.apiLabelsLock, &user.updateChLock)
|
||||
}
|
||||
|
||||
func (user *User) handleDeleteLabelEvent(_ context.Context, event liteapi.LabelEvent) error { //nolint:unparam
|
||||
@ -259,9 +254,9 @@ func (user *User) handleDeleteLabelEvent(_ context.Context, event liteapi.LabelE
|
||||
|
||||
delete(user.apiLabels, event.ID)
|
||||
|
||||
user.updateCh.IterValues(func(updateCh *queue.QueuedChannel[imap.Update]) {
|
||||
for _, updateCh := range user.updateCh {
|
||||
updateCh.Enqueue(imap.NewMailboxDeleted(imap.MailboxID(event.ID)))
|
||||
})
|
||||
}
|
||||
|
||||
user.eventCh.Enqueue(events.UserLabelDeleted{
|
||||
UserID: user.ID(),
|
||||
@ -270,7 +265,7 @@ func (user *User) handleDeleteLabelEvent(_ context.Context, event liteapi.LabelE
|
||||
})
|
||||
|
||||
return nil
|
||||
}, &user.apiLabelsLock)
|
||||
}, &user.apiLabelsLock, &user.updateChLock)
|
||||
}
|
||||
|
||||
// handleMessageEvents handles the given message events.
|
||||
@ -308,28 +303,26 @@ func (user *User) handleCreateMessageEvent(ctx context.Context, event liteapi.Me
|
||||
return fmt.Errorf("failed to build RFC822 message: %w", err)
|
||||
}
|
||||
|
||||
user.updateCh.Get(full.AddressID, func(updateCh *queue.QueuedChannel[imap.Update]) {
|
||||
updateCh.Enqueue(imap.NewMessagesCreated(buildRes.update))
|
||||
})
|
||||
user.updateCh[full.AddressID].Enqueue(imap.NewMessagesCreated(buildRes.update))
|
||||
|
||||
return nil
|
||||
})
|
||||
}, &user.apiUserLock, &user.apiAddrsLock)
|
||||
}, &user.apiUserLock, &user.apiAddrsLock, &user.updateChLock)
|
||||
}
|
||||
|
||||
func (user *User) handleUpdateMessageEvent(_ context.Context, event liteapi.MessageEvent) error { //nolint:unparam
|
||||
update := imap.NewMessageMailboxesUpdated(
|
||||
imap.MessageID(event.ID),
|
||||
mapTo[string, imap.MailboxID](xslices.Filter(event.Message.LabelIDs, wantLabelID)),
|
||||
event.Message.Seen(),
|
||||
event.Message.Starred(),
|
||||
)
|
||||
return safe.RLockRet(func() error {
|
||||
update := imap.NewMessageMailboxesUpdated(
|
||||
imap.MessageID(event.ID),
|
||||
mapTo[string, imap.MailboxID](xslices.Filter(event.Message.LabelIDs, wantLabelID)),
|
||||
event.Message.Seen(),
|
||||
event.Message.Starred(),
|
||||
)
|
||||
|
||||
user.updateCh.Get(event.Message.AddressID, func(updateCh *queue.QueuedChannel[imap.Update]) {
|
||||
updateCh.Enqueue(update)
|
||||
})
|
||||
user.updateCh[event.Message.AddressID].Enqueue(update)
|
||||
|
||||
return nil
|
||||
return nil
|
||||
}, &user.updateChLock)
|
||||
}
|
||||
|
||||
func getMailboxName(label liteapi.Label) []string {
|
||||
|
||||
Reference in New Issue
Block a user