Files
proton-bridge/internal/bridge/sync_test.go
Leander Beernaert 951c7c27fb fix(GODT-3003): Ensure IMAP State is reset after vault corruption
After we detect that the user has suffered the GODT-3003 bug due the
vault corruption not ensuring that a previous sync state would be
erased, we patch the gluon db directly and then reset the sync state.

After the account is added, the sync is automatically triggered and the
account state fixes itself.
2023-10-09 11:19:36 +01:00

788 lines
26 KiB
Go

// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package bridge_test
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/rfc822"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/go-proton-api/server"
"github.com/ProtonMail/proton-bridge/v3/internal/bridge"
"github.com/ProtonMail/proton-bridge/v3/internal/constants"
"github.com/ProtonMail/proton-bridge/v3/internal/events"
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice"
"github.com/bradenaw/juniper/iterator"
"github.com/bradenaw/juniper/stream"
"github.com/bradenaw/juniper/xslices"
"github.com/emersion/go-imap"
"github.com/emersion/go-imap/client"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)
func TestBridge_Sync(t *testing.T) {
numMsg := 1 << 8
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) {
userID, addrID, err := s.CreateUser("imap", password)
require.NoError(t, err)
labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder)
require.NoError(t, err)
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
createNumMessages(ctx, t, c, addrID, labelID, numMsg)
})
var total uint64
// The initial user should be fully synced.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) {
syncCh, done := chToType[events.Event, events.SyncFinished](bridge.GetEvents(events.SyncFinished{}))
defer done()
// Count how many bytes it takes to fully sync the user.
total = countBytesRead(netCtl, func() {
userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil)
require.NoError(t, err)
require.Equal(t, userID, (<-syncCh).UserID)
})
})
// If we then connect an IMAP client, it should see all the messages.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(b *bridge.Bridge, _ *bridge.Mocks) {
info, err := b.GetUserInfo(userID)
require.NoError(t, err)
require.True(t, info.State == bridge.Connected)
client, err := eventuallyDial(fmt.Sprintf("%v:%v", constants.Host, b.GetIMAPPort()))
require.NoError(t, err)
require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass)))
defer func() { _ = client.Logout() }()
status, err := client.Select(`Folders/folder`, false)
require.NoError(t, err)
require.Equal(t, uint32(numMsg), status.Messages)
})
// Now let's remove the user and simulate a network error.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) {
require.NoError(t, bridge.DeleteUser(ctx, userID))
})
// Pretend we can only sync 2/3 of the original messages.
netCtl.SetReadLimit(2 * total / 3)
// Login the user; its sync should fail.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(b *bridge.Bridge, _ *bridge.Mocks) {
{
syncCh, done := chToType[events.Event, events.SyncFailed](b.GetEvents(events.SyncFailed{}))
defer done()
userID, err := b.LoginFull(ctx, "imap", password, nil, nil)
require.NoError(t, err)
require.Equal(t, userID, (<-syncCh).UserID)
info, err := b.GetUserInfo(userID)
require.NoError(t, err)
require.True(t, info.State == bridge.Connected)
}
// Remove the network limit, allowing the sync to finish.
netCtl.SetReadLimit(0)
{
syncCh, done := chToType[events.Event, events.SyncFinished](b.GetEvents(events.SyncFinished{}))
defer done()
require.Equal(t, userID, (<-syncCh).UserID)
info, err := b.GetUserInfo(userID)
require.NoError(t, err)
require.True(t, info.State == bridge.Connected)
client, err := eventuallyDial(fmt.Sprintf("%v:%v", constants.Host, b.GetIMAPPort()))
require.NoError(t, err)
require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass)))
defer func() { _ = client.Logout() }()
status, err := client.Select(`Folders/folder`, false)
require.NoError(t, err)
require.Equal(t, uint32(numMsg), status.Messages)
}
})
}, server.WithTLS(false))
}
// GODT-2215: This test no longer works since it's now possible to import messages into Gluon with bad ContentType header.
func _TestBridge_Sync_BadMessage(t *testing.T) { //nolint:unused,deadcode
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) {
userID, addrID, err := s.CreateUser("imap", password)
require.NoError(t, err)
labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder)
require.NoError(t, err)
var messageIDs []string
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
messageIDs = createMessages(ctx, t, c, addrID, labelID,
[]byte("To: someone@pm.me\r\nSubject: Good message\r\n\r\nHello!"),
[]byte("To: someone@pm.me\r\nSubject: Bad message\r\nContentType: this is not a valid content type\r\n\r\nHello!"),
)
})
// The initial user should be fully synced and should skip the bad message.
// We should report the bad message to sentry.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, mocks *bridge.Mocks) {
mocks.Reporter.EXPECT().ReportMessageWithContext("Failed to build message (sync)", gomock.Any())
syncCh, done := chToType[events.Event, events.SyncFinished](bridge.GetEvents(events.SyncFinished{}))
defer done()
userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil)
require.NoError(t, err)
require.Equal(t, userID, (<-syncCh).UserID)
})
// If we then connect an IMAP client, it should see the good message but not the bad one.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(b *bridge.Bridge, _ *bridge.Mocks) {
info, err := b.GetUserInfo(userID)
require.NoError(t, err)
require.True(t, info.State == bridge.Connected)
client, err := eventuallyDial(fmt.Sprintf("%v:%v", constants.Host, b.GetIMAPPort()))
require.NoError(t, err)
require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass)))
defer func() { _ = client.Logout() }()
status, err := client.Select(`Folders/folder`, false)
require.NoError(t, err)
require.Equal(t, uint32(1), status.Messages)
messages, err := clientFetch(client, `Folders/folder`)
require.NoError(t, err)
require.Len(t, messages, 1)
// The bad message should have been skipped.
literal, err := io.ReadAll(messages[0].GetBody(must(imap.ParseBodySectionName("BODY[]"))))
require.NoError(t, err)
header, err := rfc822.Parse(literal).ParseHeader()
require.NoError(t, err)
require.Equal(t, "Good message", header.Get("Subject"))
require.Equal(t, messageIDs[0], header.Get("X-Pm-Internal-Id"))
})
})
}
func TestBridge_SyncWithOngoingEvents(t *testing.T) {
numMsg := 1 << 8
messageSplitIndex := numMsg * 2 / 3
renmainingMessageCount := numMsg - messageSplitIndex
messages := make([]string, 0, numMsg)
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) {
userID, addrID, err := s.CreateUser("imap", password)
require.NoError(t, err)
labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder)
require.NoError(t, err)
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
importResults := createNumMessages(ctx, t, c, addrID, labelID, numMsg)
for _, v := range importResults {
if len(v) != 0 {
messages = append(messages, v)
}
}
})
var total uint64
// The initial user should be fully synced.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, mocks *bridge.Mocks) {
syncCh, done := chToType[events.Event, events.SyncFinished](bridge.GetEvents(events.SyncFinished{}))
defer done()
// Count how many bytes it takes to fully sync the user.
total = countBytesRead(netCtl, func() {
userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil)
require.NoError(t, err)
require.Equal(t, userID, (<-syncCh).UserID)
})
})
// Now let's remove the user and stop the network at 2/3 of the data.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, mocks *bridge.Mocks) {
require.NoError(t, bridge.DeleteUser(ctx, userID))
})
// Pretend we can only sync 2/3 of the original messages.
netCtl.SetReadLimit(2 * total / 3)
// Login the user; its sync should fail.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(b *bridge.Bridge, mocks *bridge.Mocks) {
syncCh, done := chToType[events.Event, events.SyncFinished](b.GetEvents(events.SyncFinished{}))
defer done()
{
syncFailedCh, syncFailedDone := chToType[events.Event, events.SyncFailed](b.GetEvents(events.SyncFailed{}))
defer syncFailedDone()
userID, err := b.LoginFull(ctx, "imap", password, nil, nil)
require.NoError(t, err)
require.Equal(t, userID, (<-syncFailedCh).UserID)
info, err := b.GetUserInfo(userID)
require.NoError(t, err)
require.True(t, info.State == bridge.Connected)
}
// Create a new mailbox and move that last 1/3 of the messages into it to simulate user
// actions during sync.
{
newLabelID, err := s.CreateLabel(userID, "folder2", "", proton.LabelTypeFolder)
require.NoError(t, err)
messages := messages[messageSplitIndex:]
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
require.NoError(t, c.UnlabelMessages(ctx, messages, labelID))
require.NoError(t, c.LabelMessages(ctx, messages, newLabelID))
})
}
// Remove the network limit, allowing the sync to finish.
netCtl.SetReadLimit(0)
{
require.Equal(t, userID, (<-syncCh).UserID)
info, err := b.GetUserInfo(userID)
require.NoError(t, err)
require.True(t, info.State == bridge.Connected)
client, err := eventuallyDial(fmt.Sprintf("%v:%v", constants.Host, b.GetIMAPPort()))
require.NoError(t, err)
require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass)))
defer func() { _ = client.Logout() }()
// Check that the new messages arrive in the right location.
require.Eventually(t, func() bool {
status, err := client.Select(`Folders/folder2`, true)
if err != nil {
return false
}
if status.Messages != uint32(renmainingMessageCount) {
return false
}
return true
}, 10*time.Second, 500*time.Millisecond)
}
})
}, server.WithTLS(false))
}
func TestBridge_CanProcessEventsDuringSync(t *testing.T) {
numMsg := 1 << 8
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) {
userID, addrID, err := s.CreateUser("imap", password)
require.NoError(t, err)
labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder)
require.NoError(t, err)
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
createNumMessages(ctx, t, c, addrID, labelID, numMsg)
})
// Simulate 429 to prevent sync from progressing.
s.AddStatusHook(func(request *http.Request) (int, bool) {
if strings.Contains(request.URL.Path, "/mail/v4/messages/") {
return http.StatusTooManyRequests, true
}
return 0, false
})
// The initial user should be fully synced.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) {
syncStartedCh, syncStartedDone := chToType[events.Event, events.SyncStarted](bridge.GetEvents(events.SyncStarted{}))
defer syncStartedDone()
addressCreatedCh, addressCreatedDone := chToType[events.Event, events.UserAddressCreated](bridge.GetEvents(events.UserAddressCreated{}))
defer addressCreatedDone()
userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil)
require.NoError(t, err)
require.Equal(t, userID, (<-syncStartedCh).UserID)
// Create a new address
newAddress := "foo@proton.ch"
addrID, err := s.CreateAddress(userID, newAddress, password)
require.NoError(t, err)
event := <-addressCreatedCh
require.Equal(t, userID, event.UserID)
require.Equal(t, newAddress, event.Email)
require.Equal(t, addrID, event.AddressID)
})
}, server.WithTLS(false))
}
func TestBridge_RefreshDuringSyncRestartSync(t *testing.T) {
numMsg := 1 << 8
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) {
userID, addrID, err := s.CreateUser("imap", password)
require.NoError(t, err)
labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder)
require.NoError(t, err)
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
createNumMessages(ctx, t, c, addrID, labelID, numMsg)
})
var refreshPerformed atomic.Bool
refreshPerformed.Store(false)
// Simulate 429 to prevent sync from progressing.
s.AddStatusHook(func(request *http.Request) (int, bool) {
if strings.Contains(request.URL.Path, "/mail/v4/messages/") {
if !refreshPerformed.Load() {
return http.StatusTooManyRequests, true
}
}
return 0, false
})
// The initial user should be fully synced.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) {
syncCh, done := chToType[events.Event, events.SyncFinished](bridge.GetEvents(events.SyncFinished{}))
defer done()
userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil)
require.NoError(t, err)
syncStartedCh, syncStartedDone := chToType[events.Event, events.SyncStarted](bridge.GetEvents(events.SyncStarted{}))
defer syncStartedDone()
require.Equal(t, userID, (<-syncStartedCh).UserID)
require.NoError(t, err, s.RefreshUser(userID, proton.RefreshMail))
require.Equal(t, userID, (<-syncStartedCh).UserID)
refreshPerformed.Store(true)
require.Equal(t, userID, (<-syncCh).UserID)
})
}, server.WithTLS(false))
}
func TestBridge_EventReplayAfterSyncHasFinished(t *testing.T) {
numMsg := 1 << 8
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) {
userID, addrID, err := s.CreateUser("imap", password)
require.NoError(t, err)
labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder)
require.NoError(t, err)
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
createNumMessages(ctx, t, c, addrID, labelID, numMsg)
})
addrID1, err := s.CreateAddress(userID, "foo@proton.ch", password)
require.NoError(t, err)
var allowSyncToProgress atomic.Bool
allowSyncToProgress.Store(false)
// Simulate 429 to prevent sync from progressing.
s.AddStatusHook(func(request *http.Request) (int, bool) {
if request.Method == "GET" && strings.Contains(request.URL.Path, "/mail/v4/messages/") {
if !allowSyncToProgress.Load() {
return http.StatusTooManyRequests, true
}
}
return 0, false
})
// The initial user should be fully synced.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) {
syncCh, done := chToType[events.Event, events.SyncFinished](bridge.GetEvents(events.SyncFinished{}))
defer done()
syncStartedCh, syncStartedDone := chToType[events.Event, events.SyncStarted](bridge.GetEvents(events.SyncStarted{}))
defer syncStartedDone()
addressCreatedCh, addressCreatedDone := chToType[events.Event, events.UserAddressCreated](bridge.GetEvents(events.UserAddressCreated{}))
defer addressCreatedDone()
userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil)
require.NoError(t, err)
require.Equal(t, userID, (<-syncStartedCh).UserID)
// create 20 more messages and move them to inbox
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
createNumMessages(ctx, t, c, addrID, proton.InboxLabel, 20)
})
// User AddrID2 event as a check point to see when the new address was created.
addrID2, err := s.CreateAddress(userID, "bar@proton.ch", password)
require.NoError(t, err)
allowSyncToProgress.Store(true)
require.Equal(t, userID, (<-syncCh).UserID)
// At most two events can be published, one for the first address, then for the second.
// if the second event is not `addrID2` then something went wrong.
event := <-addressCreatedCh
if event.AddressID == addrID1 {
event = <-addressCreatedCh
}
require.Equal(t, addrID2, event.AddressID)
info, err := bridge.GetUserInfo(userID)
require.NoError(t, err)
client, err := eventuallyDial(fmt.Sprintf("%v:%v", constants.Host, bridge.GetIMAPPort()))
require.NoError(t, err)
require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass)))
defer func() { _ = client.Logout() }()
// Finally check if the 20 messages are in INBOX.
status, err := client.Status("INBOX", []imap.StatusItem{imap.StatusMessages})
require.NoError(t, err)
require.Equal(t, uint32(20), status.Messages)
// Finally check if the numMsg are in the folder.
status, err = client.Status("Folders/folder", []imap.StatusItem{imap.StatusMessages})
require.NoError(t, err)
require.Equal(t, uint32(numMsg), status.Messages)
})
}, server.WithTLS(false))
}
func TestBridge_MessageCreateDuringSync(t *testing.T) {
numMsg := 1 << 8
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) {
userID, addrID, err := s.CreateUser("imap", password)
require.NoError(t, err)
labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder)
require.NoError(t, err)
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
createNumMessages(ctx, t, c, addrID, labelID, numMsg)
})
var allowSyncToProgress atomic.Bool
allowSyncToProgress.Store(false)
// Simulate 429 to prevent sync from progressing.
s.AddStatusHook(func(request *http.Request) (int, bool) {
if request.Method == "GET" && strings.Contains(request.URL.Path, "/mail/v4/messages/") {
if !allowSyncToProgress.Load() {
return http.StatusTooManyRequests, true
}
}
return 0, false
})
// The initial user should be fully synced.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) {
syncStartedCh, syncStartedDone := chToType[events.Event, events.SyncStarted](bridge.GetEvents(events.SyncStarted{}))
defer syncStartedDone()
addressCreatedCh, addressCreatedDone := chToType[events.Event, events.UserAddressCreated](bridge.GetEvents(events.UserAddressCreated{}))
defer addressCreatedDone()
userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil)
require.NoError(t, err)
require.Equal(t, userID, (<-syncStartedCh).UserID)
// create 20 more messages and move them to inbox
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
createNumMessages(ctx, t, c, addrID, proton.InboxLabel, 20)
})
// User AddrID2 event as a check point to see when the new address was created.
addrID, err := s.CreateAddress(userID, "bar@proton.ch", password)
require.NoError(t, err)
// At most two events can be published, one for the first address, then for the second.
// if the second event is not `addrID` then something went wrong.
event := <-addressCreatedCh
require.Equal(t, addrID, event.AddressID)
allowSyncToProgress.Store(true)
info, err := bridge.GetUserInfo(userID)
require.NoError(t, err)
client, err := eventuallyDial(fmt.Sprintf("%v:%v", constants.Host, bridge.GetIMAPPort()))
require.NoError(t, err)
require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass)))
defer func() { _ = client.Logout() }()
require.Eventually(t, func() bool {
// Finally check if the 20 messages are in INBOX.
status, err := client.Status("INBOX", []imap.StatusItem{imap.StatusMessages})
require.NoError(t, err)
return uint32(20) == status.Messages
}, 10*time.Second, time.Second)
})
}, server.WithTLS(false))
}
func TestBridge_CorruptedVaultClearsPreviousIMAPSyncState(t *testing.T) {
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, vaultKey []byte) {
userID, addrID, err := s.CreateUser("imap", password)
require.NoError(t, err)
labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder)
require.NoError(t, err)
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
createNumMessages(ctx, t, c, addrID, labelID, 100)
})
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, vaultKey, func(bridge *bridge.Bridge, mocks *bridge.Mocks) {
syncCh, done := chToType[events.Event, events.SyncFinished](bridge.GetEvents(events.SyncFinished{}))
defer done()
var err error
userID, err = bridge.LoginFull(context.Background(), "imap", password, nil, nil)
require.NoError(t, err)
// Wait for sync to finish
require.Equal(t, userID, (<-syncCh).UserID)
})
settingsPath, err := locator.ProvideSettingsPath()
require.NoError(t, err)
syncConfigPath, err := locator.ProvideIMAPSyncConfigPath()
require.NoError(t, err)
syncStatePath := imapservice.GetSyncConfigPath(syncConfigPath, userID)
// Check sync state is complete
{
state, err := imapservice.NewSyncState(syncStatePath)
require.NoError(t, err)
syncStatus, err := state.GetSyncStatus(context.Background())
require.NoError(t, err)
require.True(t, syncStatus.IsComplete())
}
// corrupt the vault
require.NoError(t, os.WriteFile(filepath.Join(settingsPath, "vault.enc"), []byte("Trash!"), 0o600))
// Bridge starts but can't find the gluon database dir; there should be no error.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, vaultKey, func(bridge *bridge.Bridge, mocks *bridge.Mocks) {
_, err := bridge.LoginFull(context.Background(), "imap", password, nil, nil)
require.NoError(t, err)
})
// Check sync state is reset.
{
state, err := imapservice.NewSyncState(syncStatePath)
require.NoError(t, err)
syncStatus, err := state.GetSyncStatus(context.Background())
require.NoError(t, err)
require.False(t, syncStatus.IsComplete())
}
})
}
func withClient(ctx context.Context, t *testing.T, s *server.Server, username string, password []byte, fn func(context.Context, *proton.Client)) { //nolint:unparam
m := proton.New(
proton.WithHostURL(s.GetHostURL()),
proton.WithTransport(proton.InsecureTransport()),
)
c, _, err := m.NewClientWithLogin(ctx, username, password)
require.NoError(t, err)
defer c.Close()
fn(ctx, c)
}
func clientFetch(client *client.Client, mailbox string, extraItems ...imap.FetchItem) ([]*imap.Message, error) {
status, err := client.Select(mailbox, false)
if err != nil {
return nil, err
}
if status.Messages == 0 {
return nil, nil
}
resCh := make(chan *imap.Message)
fetchItems := []imap.FetchItem{imap.FetchFlags, imap.FetchEnvelope, imap.FetchUid, imap.FetchBodyStructure, "BODY.PEEK[]"}
fetchItems = append(fetchItems, extraItems...)
go func() {
if err := client.Fetch(
&imap.SeqSet{Set: []imap.Seq{{Start: 1, Stop: status.Messages}}},
fetchItems,
resCh,
); err != nil {
panic(err)
}
}()
return iterator.Collect(iterator.Chan(resCh)), nil
}
func clientStore(client *client.Client, from, to int, isUID bool, item imap.StoreItem, flags ...string) error {
var storeFunc func(seqset *imap.SeqSet, item imap.StoreItem, value interface{}, ch chan *imap.Message) error
if isUID {
storeFunc = client.UidStore
} else {
storeFunc = client.Store
}
return storeFunc(
&imap.SeqSet{Set: []imap.Seq{{Start: uint32(from), Stop: uint32(to)}}},
item,
xslices.Map(flags, func(flag string) interface{} { return flag }),
nil,
)
}
func clientList(client *client.Client) []*imap.MailboxInfo {
resCh := make(chan *imap.MailboxInfo)
go func() {
if err := client.List("", "*", resCh); err != nil {
panic(err)
}
}()
return iterator.Collect(iterator.Chan(resCh))
}
func createNumMessages(ctx context.Context, t *testing.T, c *proton.Client, addrID, labelID string, count int) []string {
literal, err := os.ReadFile(filepath.Join("testdata", "text-plain.eml"))
require.NoError(t, err)
return createMessages(ctx, t, c, addrID, labelID, xslices.Repeat(literal, count)...)
}
func createMessages(ctx context.Context, t *testing.T, c *proton.Client, addrID, labelID string, messages ...[]byte) []string {
return createMessagesWithFlags(ctx, t, c, addrID, labelID, 0, messages...)
}
func createMessagesWithFlags(ctx context.Context, t *testing.T, c *proton.Client, addrID, labelID string, flags proton.MessageFlag, messages ...[]byte) []string {
user, err := c.GetUser(ctx)
require.NoError(t, err)
addr, err := c.GetAddresses(ctx)
require.NoError(t, err)
salt, err := c.GetSalts(ctx)
require.NoError(t, err)
keyPass, err := salt.SaltForKey(password, user.Keys.Primary().ID)
require.NoError(t, err)
_, addrKRs, err := proton.Unlock(user, addr, keyPass, async.NoopPanicHandler{})
require.NoError(t, err)
_, ok := addrKRs[addrID]
require.True(t, ok)
var msgFlags proton.MessageFlag
if flags == 0 {
msgFlags = proton.MessageFlagReceived
} else {
msgFlags = flags
}
str, err := c.ImportMessages(
ctx,
addrKRs[addrID],
runtime.NumCPU(),
runtime.NumCPU(),
xslices.Map(messages, func(message []byte) proton.ImportReq {
return proton.ImportReq{
Metadata: proton.ImportMetadata{
AddressID: addrID,
LabelIDs: []string{labelID},
Flags: msgFlags,
},
Message: message,
}
})...,
)
require.NoError(t, err)
res, err := stream.Collect(ctx, str)
require.NoError(t, err)
return xslices.Map(res, func(res proton.ImportRes) string {
return res.MessageID
})
}
func countBytesRead(ctl *proton.NetCtl, fn func()) uint64 {
var read uint64
ctl.OnRead(func(b []byte) {
atomic.AddUint64(&read, uint64(len(b)))
})
fn()
return read
}