GODT-1777: Message de-duplication in IMAP (+ cleanup)

This commit is contained in:
James Houlahan
2022-10-23 19:02:26 +02:00
parent c9808d07df
commit 036a416a25
7 changed files with 167 additions and 148 deletions

2
go.mod
View File

@ -40,7 +40,7 @@ require (
github.com/urfave/cli/v2 v2.20.3
gitlab.protontech.ch/go/liteapi v0.36.0
go.uber.org/goleak v1.2.0
golang.org/x/exp v0.0.0-20220921164117-439092de6870
golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e
golang.org/x/net v0.1.0
golang.org/x/sys v0.1.0
golang.org/x/text v0.4.0

4
go.sum
View File

@ -427,8 +427,8 @@ golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxT
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4=
golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek=
golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
golang.org/x/exp v0.0.0-20220921164117-439092de6870 h1:j8b6j9gzSigH28O5SjSpQSSh9lFd6f5D/q0aHjNTulc=
golang.org/x/exp v0.0.0-20220921164117-439092de6870/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e h1:SkwG94eNiiYJhbeDE018Grw09HIN/KB9NlRmZsrzfWs=
golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=

View File

@ -87,30 +87,7 @@ func (conn *imapConnector) GetMailbox(ctx context.Context, mailboxID imap.Mailbo
return imap.Mailbox{}, err
}
var name []string
switch label.Type {
case liteapi.LabelTypeLabel:
name = []string{labelPrefix, label.Name}
case liteapi.LabelTypeFolder:
name = []string{folderPrefix, label.Name}
case liteapi.LabelTypeContactGroup:
fallthrough
case liteapi.LabelTypeSystem:
fallthrough
default:
name = []string{label.Name}
}
return imap.Mailbox{
ID: imap.MailboxID(label.ID),
Name: name,
Flags: conn.flags,
PermanentFlags: conn.permFlags,
Attributes: conn.attrs,
}, nil
return toIMAPMailbox(label, conn.flags, conn.permFlags, conn.attrs), nil
}
// CreateMailbox creates a label with the given name.
@ -136,13 +113,7 @@ func (conn *imapConnector) CreateMailbox(ctx context.Context, name []string) (im
return imap.Mailbox{}, err
}
return imap.Mailbox{
ID: imap.MailboxID(label.ID),
Name: name,
Flags: conn.flags,
PermanentFlags: conn.permFlags,
Attributes: conn.attrs,
}, nil
return toIMAPMailbox(label, conn.flags, conn.permFlags, conn.attrs), nil
}
// UpdateMailboxName sets the name of the label with the given ID.
@ -196,21 +167,7 @@ func (conn *imapConnector) GetMessage(ctx context.Context, messageID imap.Messag
return imap.Message{}, nil, err
}
flags := imap.NewFlagSet()
if !message.Unread {
flags = flags.Add(imap.FlagSeen)
}
if slices.Contains(message.LabelIDs, liteapi.StarredLabel) {
flags = flags.Add(imap.FlagFlagged)
}
return imap.Message{
ID: imap.MessageID(message.ID),
Flags: flags,
Date: time.Unix(message.Time, 0),
}, mapTo[string, imap.MailboxID](message.LabelIDs), nil
return toIMAPMessage(message.MessageMetadata), mapTo[string, imap.MailboxID](message.LabelIDs), nil
}
// CreateMessage creates a new message on the remote.
@ -223,75 +180,44 @@ func (conn *imapConnector) CreateMessage(
flags imap.FlagSet,
date time.Time,
) (imap.Message, []byte, error) {
var msgFlags liteapi.MessageFlag
// Check if we already tried to send this message recently.
if messageID, ok, err := conn.sendHash.hasEntryWait(ctx, literal, time.Now().Add(90*time.Second)); err != nil {
return imap.Message{}, nil, fmt.Errorf("failed to check send hash: %w", err)
} else if ok {
message, err := conn.client.GetMessage(ctx, messageID)
if err != nil {
return imap.Message{}, nil, fmt.Errorf("failed to fetch message: %w", err)
}
return toIMAPMessage(message.MessageMetadata), nil, nil
}
wantLabelIDs := []string{string(mailboxID)}
if flags.Contains(imap.FlagFlagged) {
wantLabelIDs = append(wantLabelIDs, liteapi.StarredLabel)
}
var wantFlags liteapi.MessageFlag
if mailboxID != liteapi.DraftsLabel {
header, _ := rfc822.Split(literal)
parsed, err := rfc822.NewHeader(header)
header, err := rfc822.Parse(literal).ParseHeader()
if err != nil {
return imap.Message{}, nil, err
}
if parsed.Has("Received") {
msgFlags = msgFlags.Add(liteapi.MessageFlagReceived)
if header.Has("Received") {
wantFlags = wantFlags.Add(liteapi.MessageFlagReceived)
} else {
msgFlags = msgFlags.Add(liteapi.MessageFlagSent)
wantFlags = wantFlags.Add(liteapi.MessageFlagSent)
}
}
var labelIDs []imap.MailboxID
if flags.Contains(imap.FlagFlagged) {
labelIDs = append(labelIDs, liteapi.StarredLabel)
}
if flags.Contains(imap.FlagAnswered) {
msgFlags = msgFlags.Add(liteapi.MessageFlagReplied)
wantFlags = wantFlags.Add(liteapi.MessageFlagReplied)
}
var (
messageID string
imported []byte
)
if err := conn.withAddrKR(conn.addrID, func(_, addrKR *crypto.KeyRing) error {
res, err := stream.Collect(ctx, conn.client.ImportMessages(ctx, addrKR, 1, 1, []liteapi.ImportReq{{
Metadata: liteapi.ImportMetadata{
AddressID: conn.addrID,
LabelIDs: mapTo[imap.MailboxID, string](append(labelIDs, mailboxID)),
Unread: !liteapi.Bool(flags.Contains(imap.FlagSeen)),
Flags: msgFlags,
},
Message: literal,
}}...))
if err != nil {
return fmt.Errorf("failed to import message: %w", err)
}
full, err := conn.client.GetFullMessage(ctx, res[0].MessageID)
if err != nil {
return fmt.Errorf("failed to fetch imported message: %w", err)
}
b, err := message.BuildRFC822(addrKR, full.Message, full.AttData, defaultJobOpts())
if err != nil {
return fmt.Errorf("failed to build message: %w", err)
}
messageID = full.ID
imported = b
return nil
}); err != nil {
return imap.Message{}, nil, err
}
return imap.Message{
ID: imap.MessageID(messageID),
Flags: flags,
Date: date,
}, imported, nil
return conn.importMessage(ctx, literal, wantLabelIDs, wantFlags, !flags.Contains(imap.FlagSeen))
}
// AddMessagesToMailbox labels the given messages with the given label ID.
@ -367,3 +293,82 @@ func (conn *imapConnector) Close(ctx context.Context) error {
func (conn *imapConnector) IsMailboxVisible(_ context.Context, mailboxID imap.MailboxID) bool {
return atomic.LoadUint32(&conn.showAllMail) != 0 || mailboxID != liteapi.AllMailLabel
}
func (conn *imapConnector) importMessage(
ctx context.Context,
literal []byte,
labelIDs []string,
flags liteapi.MessageFlag,
unread bool,
) (imap.Message, []byte, error) {
var full liteapi.FullMessage
if err := conn.withAddrKR(conn.addrID, func(_, addrKR *crypto.KeyRing) error {
res, err := stream.Collect(ctx, conn.client.ImportMessages(ctx, addrKR, 1, 1, []liteapi.ImportReq{{
Metadata: liteapi.ImportMetadata{
AddressID: conn.addrID,
LabelIDs: labelIDs,
Unread: liteapi.Bool(unread),
Flags: flags,
},
Message: literal,
}}...))
if err != nil {
return fmt.Errorf("failed to import message: %w", err)
}
if full, err = conn.client.GetFullMessage(ctx, res[0].MessageID); err != nil {
return fmt.Errorf("failed to fetch message: %w", err)
}
if literal, err = message.BuildRFC822(addrKR, full.Message, full.AttData, defaultJobOpts()); err != nil {
return fmt.Errorf("failed to build message: %w", err)
}
return nil
}); err != nil {
return imap.Message{}, nil, err
}
return toIMAPMessage(full.MessageMetadata), literal, nil
}
func toIMAPMessage(message liteapi.MessageMetadata) imap.Message {
flags := imap.NewFlagSet()
if !message.Unread {
flags = flags.Add(imap.FlagSeen)
}
if slices.Contains(message.LabelIDs, liteapi.StarredLabel) {
flags = flags.Add(imap.FlagFlagged)
}
if slices.Contains(message.LabelIDs, liteapi.DraftsLabel) {
flags = flags.Add(imap.FlagDraft)
}
return imap.Message{
ID: imap.MessageID(message.ID),
Flags: flags,
Date: time.Unix(message.Time, 0),
}
}
func toIMAPMailbox(label liteapi.Label, flags, permFlags, attrs imap.FlagSet) imap.Mailbox {
var name []string
if label.Type == liteapi.LabelTypeLabel {
name = append(name, labelPrefix)
} else if label.Type == liteapi.LabelTypeFolder {
name = append(name, folderPrefix)
}
return imap.Mailbox{
ID: imap.MailboxID(label.ID),
Name: append(name, label.Name),
Flags: flags,
PermanentFlags: permFlags,
Attributes: attrs,
}
}

View File

@ -1,3 +1,20 @@
// Copyright (c) 2022 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package user
import (
@ -14,7 +31,7 @@ import (
"github.com/sirupsen/logrus"
)
const sendHashExpiry = 5 * time.Minute
const sendEntryExpiry = 30 * time.Minute
type sendRecorder struct {
hasher func([]byte) (string, error)
@ -164,8 +181,8 @@ func (h *sendRecorder) wait(ctx context.Context, hash string, deadline time.Time
ctx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
waitCh, err := h.getWaitCh(hash)
if err != nil {
waitCh, ok := h.getWaitCh(hash)
if !ok {
return "", false, nil
}
@ -187,24 +204,26 @@ func (h *sendRecorder) wait(ctx context.Context, hash string, deadline time.Time
return "", false, nil
}
func (h *sendRecorder) getWaitCh(hash string) (<-chan struct{}, error) {
func (h *sendRecorder) getWaitCh(hash string) (<-chan struct{}, bool) {
h.entriesLock.Lock()
defer h.entriesLock.Unlock()
if entry, ok := h.entries[hash]; ok {
return entry.waitCh, nil
return entry.waitCh, true
}
return nil, fmt.Errorf("no entry with hash %s", hash)
return nil, false
}
// getMessageHash returns the hash of the given message.
// This takes into account:
// - the Subject header
// - the From/To/Cc/Bcc headers
// - the Content-Type header of each (leaf) part
// - the Content-Disposition header of each (leaf) part
// - the (decoded) body of each part
// - the Subject header,
// - the From/To/Cc/Bcc headers,
// - the Content-Type header of each (leaf) part,
// - the Content-Disposition header of each (leaf) part,
// - the (decoded) body of each part.
//
// nolint:funlen
func getMessageHash(b []byte) (string, error) {
section := rfc822.Parse(b)

View File

@ -1,3 +1,20 @@
// Copyright (c) 2022 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package user
import (
@ -9,7 +26,7 @@ import (
)
func TestSendHasher_Insert(t *testing.T) {
h := newSendRecorder(sendHashExpiry)
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash1, ok, err := h.tryInsertWait(context.Background(), []byte(literal1), time.Now().Add(time.Second))
@ -57,7 +74,7 @@ func TestSendHasher_Insert_Expired(t *testing.T) {
}
func TestSendHasher_Wait_SendSuccess(t *testing.T) {
h := newSendRecorder(sendHashExpiry)
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := h.tryInsertWait(context.Background(), []byte(literal1), time.Now().Add(time.Second))
@ -78,7 +95,7 @@ func TestSendHasher_Wait_SendSuccess(t *testing.T) {
}
func TestSendHasher_Wait_SendFail(t *testing.T) {
h := newSendRecorder(sendHashExpiry)
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := h.tryInsertWait(context.Background(), []byte(literal1), time.Now().Add(time.Second))
@ -102,7 +119,7 @@ func TestSendHasher_Wait_SendFail(t *testing.T) {
}
func TestSendHasher_Wait_Timeout(t *testing.T) {
h := newSendRecorder(sendHashExpiry)
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := h.tryInsertWait(context.Background(), []byte(literal1), time.Now().Add(time.Second))
@ -116,7 +133,7 @@ func TestSendHasher_Wait_Timeout(t *testing.T) {
}
func TestSendHasher_HasEntry(t *testing.T) {
h := newSendRecorder(sendHashExpiry)
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := h.tryInsertWait(context.Background(), []byte(literal1), time.Now().Add(time.Second))
@ -135,7 +152,7 @@ func TestSendHasher_HasEntry(t *testing.T) {
}
func TestSendHasher_HasEntry_SendSuccess(t *testing.T) {
h := newSendRecorder(sendHashExpiry)
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := h.tryInsertWait(context.Background(), []byte(literal1), time.Now().Add(time.Second))
@ -157,7 +174,7 @@ func TestSendHasher_HasEntry_SendSuccess(t *testing.T) {
}
func TestSendHasher_HasEntry_SendFail(t *testing.T) {
h := newSendRecorder(sendHashExpiry)
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := h.tryInsertWait(context.Background(), []byte(literal1), time.Now().Add(time.Second))
@ -178,7 +195,7 @@ func TestSendHasher_HasEntry_SendFail(t *testing.T) {
}
func TestSendHasher_HasEntry_Timeout(t *testing.T) {
h := newSendRecorder(sendHashExpiry)
h := newSendRecorder(sendEntryExpiry)
// Insert a message into the hasher.
hash, ok, err := h.tryInsertWait(context.Background(), []byte(literal1), time.Now().Add(time.Second))

View File

@ -19,14 +19,12 @@ package user
import (
"fmt"
"time"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/ProtonMail/proton-bridge/v2/pkg/message"
"github.com/bradenaw/juniper/xslices"
"gitlab.protontech.ch/go/liteapi"
"golang.org/x/exp/slices"
)
type buildRes struct {
@ -70,28 +68,8 @@ func newMessageCreatedUpdate(message liteapi.MessageMetadata, literal []byte) (*
return nil, err
}
flags := imap.NewFlagSet()
if !message.Unread {
flags = flags.Add(imap.FlagSeen)
}
if slices.Contains(message.LabelIDs, liteapi.StarredLabel) {
flags = flags.Add(imap.FlagFlagged)
}
if slices.Contains(message.LabelIDs, liteapi.DraftsLabel) {
flags = flags.Add(imap.FlagDraft)
}
imapMessage := imap.Message{
ID: imap.MessageID(message.ID),
Flags: flags,
Date: time.Unix(message.Time, 0),
}
return &imap.MessageCreated{
Message: imapMessage,
Message: toIMAPMessage(message),
Literal: literal,
MailboxIDs: mapTo[string, imap.MailboxID](xslices.Filter(message.LabelIDs, wantLabelID)),
ParsedMessage: parsedMessage,

View File

@ -127,7 +127,7 @@ func New(
apiUser: safe.NewValue(apiUser),
apiAddrs: safe.NewMapFrom(groupBy(apiAddrs, func(addr liteapi.Address) string { return addr.ID }), sortAddr),
updateCh: safe.NewMapFrom(updateCh, nil),
sendHash: newSendRecorder(sendHashExpiry),
sendHash: newSendRecorder(sendEntryExpiry),
tasks: xsync.NewGroup(context.Background()),