diff --git a/internal/frontend/qt/frontend_settings.go b/internal/frontend/qt/frontend_settings.go index 545de57d..0e00102b 100644 --- a/internal/frontend/qt/frontend_settings.go +++ b/internal/frontend/qt/frontend_settings.go @@ -61,18 +61,15 @@ func (f *FrontendQt) changeLocalCache(enableDiskCache bool, diskCachePath *core. _diskCachePath = _diskCachePath[1:] } - // If disk cache not enabled, or path not changed then no need to change location - if !enableDiskCache || _diskCachePath == f.settings.Get(settings.CacheLocationKey) { - return + if enableDiskCache && _diskCachePath != f.settings.Get(settings.CacheLocationKey) { + if err := f.bridge.MigrateCache(f.settings.Get(settings.CacheLocationKey), _diskCachePath); err != nil { + f.log.WithError(err).Error("The local cache location could not be changed.") + f.qml.CacheCantMove() + return + } + f.settings.Set(settings.CacheLocationKey, _diskCachePath) } - if err := f.bridge.MigrateCache(f.settings.Get(settings.CacheLocationKey), _diskCachePath); err != nil { - f.log.WithError(err).Error("The local cache location could not be changed.") - f.qml.CacheCantMove() - return - } - - f.settings.Set(settings.CacheLocationKey, _diskCachePath) f.qml.CacheLocationChangeSuccess() f.restart() } diff --git a/internal/store/cache.go b/internal/store/cache.go index 96fe30ac..cec49c12 100644 --- a/internal/store/cache.go +++ b/internal/store/cache.go @@ -18,6 +18,8 @@ package store import ( + "context" + "github.com/ProtonMail/gopenpgp/v2/crypto" "github.com/ProtonMail/proton-bridge/pkg/message" "github.com/sirupsen/logrus" @@ -59,7 +61,7 @@ func (store *Store) UnlockCache(kr *crypto.KeyRing) error { return err } - store.cacher.start() + store.msgCachePool.start() return nil } @@ -112,7 +114,7 @@ func (store *Store) getCachedMessage(messageID string) ([]byte, error) { return store.cache.Get(store.user.ID(), messageID) } - job, done := store.newBuildJob(messageID, message.ForegroundPriority) + job, done := store.newBuildJob(context.Background(), messageID, message.ForegroundPriority) defer done() literal, err := job.GetResult() @@ -135,11 +137,11 @@ func (store *Store) IsCached(messageID string) bool { // BuildAndCacheMessage builds the given message (with background priority) and puts it in the cache. // It builds with background priority. -func (store *Store) BuildAndCacheMessage(messageID string) error { +func (store *Store) BuildAndCacheMessage(ctx context.Context, messageID string) error { buildAndCacheJobs <- struct{}{} defer func() { <-buildAndCacheJobs }() - job, done := store.newBuildJob(messageID, message.BackgroundPriority) + job, done := store.newBuildJob(ctx, messageID, message.BackgroundPriority) defer done() literal, err := job.GetResult() diff --git a/internal/store/cache/disk.go b/internal/store/cache/disk.go index b4b3ee33..307ae0f1 100644 --- a/internal/store/cache/disk.go +++ b/internal/store/cache/disk.go @@ -34,6 +34,12 @@ import ( var ErrLowSpace = errors.New("not enough free space left on device") +// IsOnDiskCache will return true if Cache is type of onDiskCache. +func IsOnDiskCache(c Cache) bool { + _, ok := c.(*onDiskCache) + return ok +} + type onDiskCache struct { path string opts Options diff --git a/internal/store/cache_watcher.go b/internal/store/cache_watcher.go index cc8701f3..1e1471a1 100644 --- a/internal/store/cache_watcher.go +++ b/internal/store/cache_watcher.go @@ -17,14 +17,27 @@ package store -import "time" +import ( + "context" + "time" + + "github.com/ProtonMail/proton-bridge/internal/store/cache" +) func (store *Store) StartWatcher() { + if !cache.IsOnDiskCache(store.cache) { + return + } + store.done = make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + store.msgCachePool.ctx = ctx + go func() { ticker := time.NewTicker(10 * time.Minute) defer ticker.Stop() + defer cancel() for { // NOTE(GODT-1158): Race condition here? What if DB was already closed? @@ -35,7 +48,7 @@ func (store *Store) StartWatcher() { for _, messageID := range messageIDs { if !store.IsCached(messageID) { - store.cacher.newJob(messageID) + store.msgCachePool.newJob(messageID) } } diff --git a/internal/store/cache_worker.go b/internal/store/cache_worker.go index 68173428..c846798c 100644 --- a/internal/store/cache_worker.go +++ b/internal/store/cache_worker.go @@ -18,35 +18,38 @@ package store import ( + "context" "sync" "github.com/sirupsen/logrus" ) -type Cacher struct { +type MsgCachePool struct { storer Storer jobs chan string done chan struct{} started bool wg *sync.WaitGroup + ctx context.Context } type Storer interface { IsCached(messageID string) bool - BuildAndCacheMessage(messageID string) error + BuildAndCacheMessage(ctx context.Context, messageID string) error } -func newCacher(storer Storer) *Cacher { - return &Cacher{ +func newMsgCachePool(storer Storer) *MsgCachePool { + return &MsgCachePool{ storer: storer, jobs: make(chan string), done: make(chan struct{}), wg: &sync.WaitGroup{}, + ctx: context.Background(), } } // newJob sends a new job to the cacher if it's running. -func (cacher *Cacher) newJob(messageID string) { +func (cacher *MsgCachePool) newJob(messageID string) { if !cacher.started { return } @@ -63,7 +66,7 @@ func (cacher *Cacher) newJob(messageID string) { } } -func (cacher *Cacher) start() { +func (cacher *MsgCachePool) start() { cacher.started = true go func() { @@ -79,17 +82,17 @@ func (cacher *Cacher) start() { }() } -func (cacher *Cacher) handleJob(messageID string) { +func (cacher *MsgCachePool) handleJob(messageID string) { defer cacher.wg.Done() - if err := cacher.storer.BuildAndCacheMessage(messageID); err != nil { + if err := cacher.storer.BuildAndCacheMessage(cacher.ctx, messageID); err != nil { logrus.WithError(err).Error("Failed to build and cache message") } else { logrus.WithField("messageID", messageID).Trace("Message cached") } } -func (cacher *Cacher) stop() { +func (cacher *MsgCachePool) stop() { cacher.started = false cacher.wg.Wait() diff --git a/internal/store/cache_worker_test.go b/internal/store/cache_worker_test.go index 6bd6b121..d3d0f8e2 100644 --- a/internal/store/cache_worker_test.go +++ b/internal/store/cache_worker_test.go @@ -25,7 +25,7 @@ import ( "github.com/pkg/errors" ) -func withTestCacher(t *testing.T, doTest func(storer *storemocks.MockStorer, cacher *Cacher)) { +func withTestCacher(t *testing.T, doTest func(storer *storemocks.MockStorer, cacher *MsgCachePool)) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -33,7 +33,7 @@ func withTestCacher(t *testing.T, doTest func(storer *storemocks.MockStorer, cac storer := storemocks.NewMockStorer(ctrl) // Create a new cacher pointing to the fake store. - cacher := newCacher(storer) + cacher := newMsgCachePool(storer) // Start the cacher and wait for it to stop. cacher.start() @@ -44,16 +44,16 @@ func withTestCacher(t *testing.T, doTest func(storer *storemocks.MockStorer, cac func TestCacher(t *testing.T) { // If the message is not yet cached, we should expect to try to build and cache it. - withTestCacher(t, func(storer *storemocks.MockStorer, cacher *Cacher) { + withTestCacher(t, func(storer *storemocks.MockStorer, cacher *MsgCachePool) { storer.EXPECT().IsCached("messageID").Return(false) - storer.EXPECT().BuildAndCacheMessage("messageID").Return(nil) + storer.EXPECT().BuildAndCacheMessage(cacher.ctx, "messageID").Return(nil) cacher.newJob("messageID") }) } func TestCacherAlreadyCached(t *testing.T) { // If the message is already cached, we should not try to build it. - withTestCacher(t, func(storer *storemocks.MockStorer, cacher *Cacher) { + withTestCacher(t, func(storer *storemocks.MockStorer, cacher *MsgCachePool) { storer.EXPECT().IsCached("messageID").Return(true) cacher.newJob("messageID") }) @@ -61,9 +61,9 @@ func TestCacherAlreadyCached(t *testing.T) { func TestCacherFail(t *testing.T) { // If building the message fails, we should not try to cache it. - withTestCacher(t, func(storer *storemocks.MockStorer, cacher *Cacher) { + withTestCacher(t, func(storer *storemocks.MockStorer, cacher *MsgCachePool) { storer.EXPECT().IsCached("messageID").Return(false) - storer.EXPECT().BuildAndCacheMessage("messageID").Return(errors.New("failed to build message")) + storer.EXPECT().BuildAndCacheMessage(cacher.ctx, "messageID").Return(errors.New("failed to build message")) cacher.newJob("messageID") }) } @@ -76,14 +76,14 @@ func TestCacherStop(t *testing.T) { storer := storemocks.NewMockStorer(ctrl) // Create a new cacher pointing to the fake store. - cacher := newCacher(storer) + cacher := newMsgCachePool(storer) // Start the cacher. cacher.start() // Send a job -- this should succeed. storer.EXPECT().IsCached("messageID").Return(false) - storer.EXPECT().BuildAndCacheMessage("messageID").Return(nil) + storer.EXPECT().BuildAndCacheMessage(cacher.ctx, "messageID").Return(nil) cacher.newJob("messageID") // Stop the cacher. diff --git a/internal/store/mocks/mocks.go b/internal/store/mocks/mocks.go index 70f6b551..e0e8c6a4 100644 --- a/internal/store/mocks/mocks.go +++ b/internal/store/mocks/mocks.go @@ -355,17 +355,17 @@ func (m *MockStorer) EXPECT() *MockStorerMockRecorder { } // BuildAndCacheMessage mocks base method. -func (m *MockStorer) BuildAndCacheMessage(arg0 string) error { +func (m *MockStorer) BuildAndCacheMessage(arg0 context.Context, arg1 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BuildAndCacheMessage", arg0) + ret := m.ctrl.Call(m, "BuildAndCacheMessage", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // BuildAndCacheMessage indicates an expected call of BuildAndCacheMessage. -func (mr *MockStorerMockRecorder) BuildAndCacheMessage(arg0 interface{}) *gomock.Call { +func (mr *MockStorerMockRecorder) BuildAndCacheMessage(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildAndCacheMessage", reflect.TypeOf((*MockStorer)(nil).BuildAndCacheMessage), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildAndCacheMessage", reflect.TypeOf((*MockStorer)(nil).BuildAndCacheMessage), arg0, arg1) } // IsCached mocks base method. diff --git a/internal/store/store.go b/internal/store/store.go index 9c553af7..6814a504 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -135,10 +135,10 @@ type Store struct { addresses map[string]*Address notifier ChangeNotifier - builder *message.Builder - cache cache.Cache - cacher *Cacher - done chan struct{} + builder *message.Builder + cache cache.Cache + msgCachePool *MsgCachePool + done chan struct{} isSyncRunning bool syncCooldown cooldown @@ -194,7 +194,7 @@ func New( // nolint[funlen] // Create a new cacher. It's not started yet. // NOTE(GODT-1158): I hate this circular dependency store->cacher->store :( - store.cacher = newCacher(store) + store.msgCachePool = newMsgCachePool(store) // Minimal increase is event pollInterval, doubles every failed retry up to 5 minutes. store.syncCooldown.setExponentialWait(pollInterval, 2, 5*time.Minute) @@ -388,9 +388,9 @@ func (store *Store) addAddress(address, addressID string, labels []*pmapi.Label) } // newBuildJob returns a new build job for the given message using the store's message builder. -func (store *Store) newBuildJob(messageID string, priority int) (*message.Job, pool.DoneFunc) { +func (store *Store) newBuildJob(ctx context.Context, messageID string, priority int) (*message.Job, pool.DoneFunc) { return store.builder.NewJobWithOptions( - context.Background(), + ctx, store.client(), messageID, message.JobOptions{ @@ -421,7 +421,7 @@ func (store *Store) CloseEventLoopAndCacher() { store.stopWatcher() - store.cacher.stop() + store.msgCachePool.stop() } func (store *Store) close() error { @@ -457,6 +457,8 @@ func (store *Store) Remove() error { } func (store *Store) RemoveCache() error { + store.stopWatcher() + if err := store.clearCachePassphrase(); err != nil { logrus.WithError(err).Error("Failed to clear cache passphrase") } diff --git a/internal/store/user_message.go b/internal/store/user_message.go index 9b06b890..5bf001e8 100644 --- a/internal/store/user_message.go +++ b/internal/store/user_message.go @@ -304,7 +304,7 @@ func (store *Store) createOrUpdateMessagesEvent(msgs []*pmapi.Message) error { / // Notify the cacher that it should start caching messages. for _, msg := range msgs { - store.cacher.newJob(msg.ID) + store.msgCachePool.newJob(msg.ID) } return nil diff --git a/internal/users/cache.go b/internal/users/cache.go index 1bc4e182..eb1e8a91 100644 --- a/internal/users/cache.go +++ b/internal/users/cache.go @@ -188,8 +188,6 @@ func (u *Users) EnableCache() error { } func (u *Users) DisableCache() error { - // NOTE(GODT-1158): Is it an error if we can't remove a user's cache? - for _, user := range u.users { if err := user.store.RemoveCache(); err != nil { logrus.WithError(err).Error("Failed to remove user's message cache") @@ -202,24 +200,22 @@ func (u *Users) DisableCache() error { // MigrateCache moves the message cache folder from folder srcPath to folder dstPath. // srcPath must point to an existing folder. dstPath must be an empty folder or not exist. func (u *Users) MigrateCache(srcPath, dstPath string) error { - // NOTE(GODT-1158): Is it enough dstPath just close the store? Do we need dstPath force-close the cacher too? - fiSrc, err := os.Stat(srcPath) if os.IsNotExist(err) { - logrus.WithError(err).Error("unknown source for cache migration") - return err + logrus.WithError(err).Warn("Skipping migration: Unknown source for cache migration") + return nil } if !fiSrc.IsDir() { - logrus.WithError(err).Error("cache migration cannot be perform srcPath a file") - return err + logrus.WithError(err).Warn("Skipping migration: srcPath is not a dir") + return nil } if isSubfolderOf(dstPath, srcPath) { - return errors.New("the destination folder is a subfolder of the source folder") + return errors.New("destination folder is a subfolder of the source folder") } if err = checkFolderIsSuitableDestinationForCache(dstPath); err != nil { - logrus.WithError(err).Error("destination folder is not suitable for cache migration") + logrus.WithError(err).Error("The destination folder is not suitable for cache migration") return err } diff --git a/internal/users/user.go b/internal/users/user.go index cbd49d99..e6688d1e 100644 --- a/internal/users/user.go +++ b/internal/users/user.go @@ -120,7 +120,6 @@ func (u *User) connect(client pmapi.Client, creds *credentials.Credentials) erro return err } - // NOTE(GODT-1158): If using in-memory cache we probably shouldn't start the watcher? u.store.StartWatcher() } diff --git a/pkg/pmapi/response.go b/pkg/pmapi/response.go index f79bb90a..93967f9f 100644 --- a/pkg/pmapi/response.go +++ b/pkg/pmapi/response.go @@ -18,6 +18,7 @@ package pmapi import ( + "context" "math/rand" "net/http" "strconv" @@ -141,6 +142,10 @@ func wrapNoConnection(res *resty.Response, err error) (*resty.Response, error) { return res, err } + if errors.Is(err, context.Canceled) { + return res, err + } + if res.RawResponse != nil { return res, err }