fix(GODT-2693): Fix message appearing twice after sent

Previous attempt at fixing a bug in the send recorder (GODT-2627)
introduced a new problem where if the same message is sent multiple
times with different recipients it is possible to trigger a case where
the incorrect wait channel is chosen. This in turn led to IMAP client
not recognizing that message has been successfully submitted. This
case is represented by
`TestSendHashed_SameMessageWIthDifferentToListShouldWaitSuccessfullyAfterSend`
but could potentially happen over time or due some other
concurrency/scheduling wake up order.

To prevent this from happening every send recorder request now requires
that the full list of addresses be presented. This is necessary for us
to locate the correct entry and its respective wait channel.

Finally each unique send recorder request is assigned an ID, in order
to ensure make sure that if we ever need to cancel a request,
we don't accidentally cancel a similar request if the original was
removed from scope due to expiration.
This commit is contained in:
Leander Beernaert
2023-07-10 15:45:09 +02:00
parent cda6b2a728
commit 80194ad797
4 changed files with 195 additions and 92 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/ProtonMail/gluon/connector"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/rfc5322"
"github.com/ProtonMail/gluon/rfc822"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/gopenpgp/v2/crypto"
@ -281,6 +282,11 @@ func (conn *imapConnector) CreateMessage(
return imap.Message{}, nil, connector.ErrOperationNotAllowed
}
toList, err := getLiteralToList(literal)
if err != nil {
return imap.Message{}, nil, fmt.Errorf("failed to retrieve addresses from literal:%w", err)
}
// Compute the hash of the message (to match it against SMTP messages).
hash, err := getMessageHash(literal)
if err != nil {
@ -288,7 +294,7 @@ func (conn *imapConnector) CreateMessage(
}
// Check if we already tried to send this message recently.
if messageID, ok, err := conn.sendHash.hasEntryWait(ctx, hash, time.Now().Add(90*time.Second)); err != nil {
if messageID, ok, err := conn.sendHash.hasEntryWait(ctx, hash, time.Now().Add(90*time.Second), toList); err != nil {
return imap.Message{}, nil, fmt.Errorf("failed to check send hash: %w", err)
} else if ok {
conn.log.WithField("messageID", messageID).Warn("Message already sent")
@ -723,3 +729,45 @@ func buildFlagSetFromMessageMetadata(message proton.MessageMetadata) imap.FlagSe
return flags
}
func getLiteralToList(literal []byte) ([]string, error) {
headerLiteral, _ := rfc822.Split(literal)
header, err := rfc822.NewHeader(headerLiteral)
if err != nil {
return nil, err
}
var result []string
parseAddress := func(field string) error {
if fieldAddr, ok := header.GetChecked(field); ok {
addr, err := rfc5322.ParseAddressList(fieldAddr)
if err != nil {
return fmt.Errorf("failed to parse addresses for '%v': %w", field, err)
}
result = append(result, xslices.Map(addr, func(addr *mail.Address) string {
return addr.Address
})...)
return nil
}
return nil
}
if err := parseAddress("To"); err != nil {
return nil, err
}
if err := parseAddress("Cc"); err != nil {
return nil, err
}
if err := parseAddress("Bcc"); err != nil {
return nil, err
}
return result, nil
}

View File

@ -32,11 +32,14 @@ import (
const sendEntryExpiry = 30 * time.Minute
type SendRecorderID uint64
type sendRecorder struct {
expiry time.Duration
entries map[string][]*sendEntry
entriesLock sync.Mutex
entries map[string][]*sendEntry
entriesLock sync.Mutex
cancelIDCounter uint64
}
func newSendRecorder(expiry time.Duration) *sendRecorder {
@ -47,6 +50,7 @@ func newSendRecorder(expiry time.Duration) *sendRecorder {
}
type sendEntry struct {
srID SendRecorderID
msgID string
toList []string
exp time.Time
@ -69,16 +73,17 @@ func (h *sendRecorder) tryInsertWait(
hash string,
toList []string,
deadline time.Time,
) (bool, error) {
) (SendRecorderID, bool, error) {
// If we successfully inserted the hash, we can return true.
if h.tryInsert(hash, toList) {
return true, nil
srID, waitCh, ok := h.tryInsert(hash, toList)
if ok {
return srID, true, nil
}
// A message with this hash is already being sent; wait for it.
_, wasSent, err := h.wait(ctx, hash, deadline)
_, wasSent, err := h.wait(ctx, hash, waitCh, srID, deadline)
if err != nil {
return false, fmt.Errorf("failed to wait for message to be sent: %w", err)
return 0, false, fmt.Errorf("failed to wait for message to be sent: %w", err)
}
// If the message failed to send, try to insert it again.
@ -86,18 +91,23 @@ func (h *sendRecorder) tryInsertWait(
return h.tryInsertWait(ctx, hash, toList, deadline)
}
return false, nil
return srID, false, nil
}
// hasEntryWait returns whether the given message already exists in the send recorder.
// If it does, it waits for its ID to be known, then returns it and true.
// If no entry exists, or it times out while waiting for its ID to be known, it returns false.
func (h *sendRecorder) hasEntryWait(ctx context.Context, hash string, deadline time.Time) (string, bool, error) {
if !h.hasEntry(hash) {
func (h *sendRecorder) hasEntryWait(ctx context.Context,
hash string,
deadline time.Time,
toList []string,
) (string, bool, error) {
srID, waitCh, found := h.getEntryWaitInfo(hash, toList)
if !found {
return "", false, nil
}
messageID, wasSent, err := h.wait(ctx, hash, deadline)
messageID, wasSent, err := h.wait(ctx, hash, waitCh, srID, deadline)
if errors.Is(err, context.DeadlineExceeded) {
return "", false, nil
} else if err != nil {
@ -108,7 +118,7 @@ func (h *sendRecorder) hasEntryWait(ctx context.Context, hash string, deadline t
return messageID, true, nil
}
return h.hasEntryWait(ctx, hash, deadline)
return h.hasEntryWait(ctx, hash, deadline, toList)
}
func (h *sendRecorder) removeExpiredUnsafe() {
@ -125,7 +135,7 @@ func (h *sendRecorder) removeExpiredUnsafe() {
}
}
func (h *sendRecorder) tryInsert(hash string, toList []string) bool {
func (h *sendRecorder) tryInsert(hash string, toList []string) (SendRecorderID, <-chan struct{}, bool) {
h.entriesLock.Lock()
defer h.entriesLock.Unlock()
@ -135,42 +145,50 @@ func (h *sendRecorder) tryInsert(hash string, toList []string) bool {
if ok {
for _, entry := range entries {
if matchToList(entry.toList, toList) {
return false
return entry.srID, entry.waitCh, false
}
}
}
cancelID := h.newSendRecorderID()
waitCh := make(chan struct{})
h.entries[hash] = append(entries, &sendEntry{
srID: cancelID,
exp: time.Now().Add(h.expiry),
toList: toList,
waitCh: make(chan struct{}),
waitCh: waitCh,
})
return true
return cancelID, waitCh, true
}
func (h *sendRecorder) hasEntry(hash string) bool {
func (h *sendRecorder) getEntryWaitInfo(hash string, toList []string) (SendRecorderID, <-chan struct{}, bool) {
h.entriesLock.Lock()
defer h.entriesLock.Unlock()
h.removeExpiredUnsafe()
if _, ok := h.entries[hash]; ok {
return true
if entries, ok := h.entries[hash]; ok {
for _, e := range entries {
if matchToList(e.toList, toList) {
return e.srID, e.waitCh, true
}
}
}
return false
return 0, nil, false
}
// signalMessageSent should be called after a message has been successfully sent.
func (h *sendRecorder) signalMessageSent(hash, msgID string, toList []string) {
func (h *sendRecorder) signalMessageSent(hash string, srID SendRecorderID, msgID string) {
h.entriesLock.Lock()
defer h.entriesLock.Unlock()
entries, ok := h.entries[hash]
if ok {
for _, entry := range entries {
if matchToList(entry.toList, toList) {
if entry.srID == srID {
entry.msgID = msgID
entry.closeWaitChannel()
return
@ -181,7 +199,7 @@ func (h *sendRecorder) signalMessageSent(hash, msgID string, toList []string) {
logrus.Warn("Cannot add message ID to send hash entry, it may have expired")
}
func (h *sendRecorder) removeOnFail(hash string, toList []string) {
func (h *sendRecorder) removeOnFail(hash string, id SendRecorderID) {
h.entriesLock.Lock()
defer h.entriesLock.Unlock()
@ -191,7 +209,7 @@ func (h *sendRecorder) removeOnFail(hash string, toList []string) {
}
for idx, entry := range entries {
if entry.msgID == "" && matchToList(entry.toList, toList) {
if entry.srID == id && entry.msgID == "" {
entry.closeWaitChannel()
remaining := xslices.Remove(entries, idx, 1)
@ -204,15 +222,16 @@ func (h *sendRecorder) removeOnFail(hash string, toList []string) {
}
}
func (h *sendRecorder) wait(ctx context.Context, hash string, deadline time.Time) (string, bool, error) {
func (h *sendRecorder) wait(
ctx context.Context,
hash string,
waitCh <-chan struct{},
srID SendRecorderID,
deadline time.Time,
) (string, bool, error) {
ctx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
waitCh, ok := h.getWaitCh(hash)
if !ok {
return "", false, nil
}
select {
case <-ctx.Done():
return "", false, ctx.Err()
@ -225,21 +244,19 @@ func (h *sendRecorder) wait(ctx context.Context, hash string, deadline time.Time
defer h.entriesLock.Unlock()
if entry, ok := h.entries[hash]; ok {
return entry[0].msgID, true, nil
for _, e := range entry {
if e.srID == srID {
return e.msgID, true, nil
}
}
}
return "", false, nil
}
func (h *sendRecorder) getWaitCh(hash string) (<-chan struct{}, bool) {
h.entriesLock.Lock()
defer h.entriesLock.Unlock()
if entry, ok := h.entries[hash]; ok {
return entry[0].waitCh, true
}
return nil, false
func (h *sendRecorder) newSendRecorderID() SendRecorderID {
h.cancelIDCounter++
return SendRecorderID(h.cancelIDCounter)
}
// getMessageHash returns the hash of the given message.

View File

@ -29,67 +29,73 @@ func TestSendHasher_Insert(t *testing.T) {
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash1, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srdID1, hash1, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash1)
// Simulate successfully sending the message.
h.signalMessageSent(hash1, "abc", nil)
h.signalMessageSent(hash1, srdID1, "abc")
// Inserting a message with the same hash should return false.
_, ok, err = testTryInsert(h, literal1, time.Now().Add(time.Second))
srdID2, _, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.False(t, ok)
require.Equal(t, srdID1, srdID2)
// Inserting a message with a different hash should return true.
hash2, ok, err := testTryInsert(h, literal2, time.Now().Add(time.Second))
srdID3, hash2, ok, err := testTryInsert(h, literal2, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash2)
require.NotEqual(t, srdID3, srdID1)
}
func TestSendHasher_Insert_Expired(t *testing.T) {
h := newSendRecorder(time.Second)
// Insert a message into the hasher.
hash1, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srID1, hash1, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash1)
// Simulate successfully sending the message.
h.signalMessageSent(hash1, "abc", nil)
h.signalMessageSent(hash1, srID1, "abc")
// Wait for the entry to expire.
time.Sleep(time.Second)
// Inserting a message with the same hash should return true because the previous entry has since expired.
hash2, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srID2, hash2, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
// The hashes should be the same.
require.Equal(t, hash1, hash2)
// Send IDs should differ
require.NotEqual(t, srID2, srID1)
}
func TestSendHasher_Insert_DifferentToList(t *testing.T) {
h := newSendRecorder(time.Second)
// Insert a message into the hasher.
hash1, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second), []string{"abc", "def"}...)
srID1, hash1, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second), []string{"abc", "def"}...)
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash1)
// Insert the same message into the hasher but with a different to list.
hash2, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second), []string{"abc", "def", "ghi"}...)
srID2, hash2, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second), []string{"abc", "def", "ghi"}...)
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash2)
require.NotEqual(t, srID1, srID2)
// Insert the same message into the hasher but with the same to list.
_, ok, err = testTryInsert(h, literal1, time.Now().Add(time.Second), []string{"abc", "def", "ghi"}...)
_, _, ok, err = testTryInsert(h, literal1, time.Now().Add(time.Second), []string{"abc", "def", "ghi"}...)
require.Error(t, err)
require.False(t, ok)
}
@ -98,7 +104,7 @@ func TestSendHasher_Wait_SendSuccess(t *testing.T) {
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srID1, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
@ -106,20 +112,21 @@ func TestSendHasher_Wait_SendSuccess(t *testing.T) {
// Simulate successfully sending the message after half a second.
go func() {
time.Sleep(time.Millisecond * 500)
h.signalMessageSent(hash, "abc", nil)
h.signalMessageSent(hash, srID1, "abc")
}()
// Inserting a message with the same hash should fail.
_, ok, err = testTryInsert(h, literal1, time.Now().Add(time.Second))
srID2, _, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.False(t, ok)
require.Equal(t, srID1, srID2)
}
func TestSendHasher_Wait_SendFail(t *testing.T) {
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srID1, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
@ -127,13 +134,14 @@ func TestSendHasher_Wait_SendFail(t *testing.T) {
// Simulate failing to send the message after half a second.
go func() {
time.Sleep(time.Millisecond * 500)
h.removeOnFail(hash, nil)
h.removeOnFail(hash, srID1)
}()
// Inserting a message with the same hash should succeed because the first message failed to send.
hash2, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srID2, hash2, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEqual(t, srID2, srID1)
// The hashes should be the same.
require.Equal(t, hash, hash2)
@ -143,13 +151,13 @@ func TestSendHasher_Wait_Timeout(t *testing.T) {
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
_, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
// We should fail to insert because the message is not sent within the timeout period.
_, _, err = testTryInsert(h, literal1, time.Now().Add(time.Second))
_, _, _, err = testTryInsert(h, literal1, time.Now().Add(time.Second))
require.Error(t, err)
}
@ -157,13 +165,13 @@ func TestSendHasher_HasEntry(t *testing.T) {
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srID1, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
// Simulate successfully sending the message.
h.signalMessageSent(hash, "abc", nil)
h.signalMessageSent(hash, srID1, "abc")
// The message was already sent; we should find it in the hasher.
messageID, ok, err := testHasEntry(h, literal1, time.Now().Add(time.Second))
@ -176,7 +184,7 @@ func TestSendHasher_HasEntry_SendSuccess(t *testing.T) {
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srID1, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
@ -184,7 +192,7 @@ func TestSendHasher_HasEntry_SendSuccess(t *testing.T) {
// Simulate successfully sending the message after half a second.
go func() {
time.Sleep(time.Millisecond * 500)
h.signalMessageSent(hash, "abc", nil)
h.signalMessageSent(hash, srID1, "abc")
}()
// The message was already sent; we should find it in the hasher.
@ -202,15 +210,15 @@ func TestSendHasher_DualAddDoesNotCauseCrash(t *testing.T) {
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srID1, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
// Simulate successfully sending the message. We call this method twice as it possible for multiple SMTP connections
// to attempt to send the same message.
h.signalMessageSent(hash, "abc", nil)
h.signalMessageSent(hash, "abc", nil)
h.signalMessageSent(hash, srID1, "abc")
h.signalMessageSent(hash, srID1, "abc")
// The message was already sent; we should find it in the hasher.
messageID, ok, err := testHasEntry(h, literal1, time.Now().Add(time.Second))
@ -223,23 +231,52 @@ func TestSendHashed_MessageWithSameHasButDifferentRecipientsIsInserted(t *testin
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second), "Receiver <receiver@pm.me>")
srID1, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second), "Receiver <receiver@pm.me>")
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
hash2, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second), "Receiver <receiver@pm.me>", "Receiver2 <receiver2@pm.me>")
srID2, hash2, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second), "Receiver <receiver@pm.me>", "Receiver2 <receiver2@pm.me>")
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash2)
require.Equal(t, hash, hash2)
// Should map to different requests
require.NotEqual(t, srID2, srID1)
}
func TestSendHashed_SameMessageWIthDifferentToListShouldWaitSuccessfullyAfterSend(t *testing.T) {
// Check that if we send the same message twice with different recipients and the second message is somehow
// sent before the first, ensure that we check if the message was sent we wait on the correct object.
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
_, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Minute), "Receiver <receiver@pm.me>")
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
srID2, hash2, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Microsecond), "Receiver <receiver@pm.me>", "Receiver2 <receiver2@pm.me>")
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash2)
require.Equal(t, hash, hash2)
// simulate message sent
h.signalMessageSent(hash2, srID2, "newID")
// Simulate Wait on message 2
_, ok, err = h.hasEntryWait(context.Background(), hash2, time.Now().Add(time.Second), []string{"Receiver <receiver@pm.me>", "Receiver2 <receiver2@pm.me>"})
require.NoError(t, err)
require.True(t, ok)
}
func TestSendHasher_HasEntry_SendFail(t *testing.T) {
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srID1, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
@ -247,7 +284,7 @@ func TestSendHasher_HasEntry_SendFail(t *testing.T) {
// Simulate failing to send the message after half a second.
go func() {
time.Sleep(time.Millisecond * 500)
h.removeOnFail(hash, nil)
h.removeOnFail(hash, srID1)
}()
// The message failed to send; we should not find it in the hasher.
@ -260,7 +297,7 @@ func TestSendHasher_HasEntry_Timeout(t *testing.T) {
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
_, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
@ -275,13 +312,13 @@ func TestSendHasher_HasEntry_Expired(t *testing.T) {
h := newSendRecorder(time.Second)
// Insert a message into the hasher.
hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
srID1, hash, ok, err := testTryInsert(h, literal1, time.Now().Add(time.Second))
require.NoError(t, err)
require.True(t, ok)
require.NotEmpty(t, hash)
// Simulate successfully sending the message.
h.signalMessageSent(hash, "abc", nil)
h.signalMessageSent(hash, srID1, "abc")
// Wait for the entry to expire.
time.Sleep(time.Second)
@ -410,25 +447,25 @@ func TestGetMessageHash(t *testing.T) {
}
}
func testTryInsert(h *sendRecorder, literal string, deadline time.Time, toList ...string) (string, bool, error) { //nolint:unparam
func testTryInsert(h *sendRecorder, literal string, deadline time.Time, toList ...string) (SendRecorderID, string, bool, error) { //nolint:unparam
hash, err := getMessageHash([]byte(literal))
if err != nil {
return 0, "", false, err
}
srID, ok, err := h.tryInsertWait(context.Background(), hash, toList, deadline)
if err != nil {
return 0, "", false, err
}
return srID, hash, ok, nil
}
func testHasEntry(h *sendRecorder, literal string, deadline time.Time, toList ...string) (string, bool, error) { //nolint:unparam
hash, err := getMessageHash([]byte(literal))
if err != nil {
return "", false, err
}
ok, err := h.tryInsertWait(context.Background(), hash, toList, deadline)
if err != nil {
return "", false, err
}
return hash, ok, nil
}
func testHasEntry(h *sendRecorder, literal string, deadline time.Time) (string, bool, error) { //nolint:unparam
hash, err := getMessageHash([]byte(literal))
if err != nil {
return "", false, err
}
return h.hasEntryWait(context.Background(), hash, deadline)
return h.hasEntryWait(context.Background(), hash, deadline, toList)
}

View File

@ -81,7 +81,8 @@ func (user *User) sendMail(authID string, from string, to []string, r io.Reader)
}
// Check if we already tried to send this message recently.
if ok, err := user.sendHash.tryInsertWait(ctx, hash, to, time.Now().Add(90*time.Second)); err != nil {
srID, ok, err := user.sendHash.tryInsertWait(ctx, hash, to, time.Now().Add(90*time.Second))
if err != nil {
return fmt.Errorf("failed to check send hash: %w", err)
} else if !ok {
user.log.Warn("A duplicate message was already sent recently, skipping")
@ -89,7 +90,7 @@ func (user *User) sendMail(authID string, from string, to []string, r io.Reader)
}
// If we fail to send this message, we should remove the hash from the send recorder.
defer user.sendHash.removeOnFail(hash, to)
defer user.sendHash.removeOnFail(hash, srID)
// Create a new message parser from the reader.
parser, err := parser.New(bytes.NewReader(b))
@ -162,7 +163,7 @@ func (user *User) sendMail(authID string, from string, to []string, r io.Reader)
}
// If the message was successfully sent, we can update the message ID in the record.
user.sendHash.signalMessageSent(hash, sent.ID, to)
user.sendHash.signalMessageSent(hash, srID, sent.ID)
return nil
})