GODT-1431 Prevent watcher when not using disk on cache

- change: Rename Cacher -> MsgCachePool
- change: Do not run watcher when using memory cache
- add: Allow to cancel cacher jobs (added context)
- change: Fix behavior on context cancel (was causing no internet)
This commit is contained in:
Jakub
2021-11-22 15:12:15 +01:00
parent b82e2ca176
commit d7c5ace8e4
12 changed files with 81 additions and 58 deletions

View File

@ -61,18 +61,15 @@ func (f *FrontendQt) changeLocalCache(enableDiskCache bool, diskCachePath *core.
_diskCachePath = _diskCachePath[1:] _diskCachePath = _diskCachePath[1:]
} }
// If disk cache not enabled, or path not changed then no need to change location if enableDiskCache && _diskCachePath != f.settings.Get(settings.CacheLocationKey) {
if !enableDiskCache || _diskCachePath == f.settings.Get(settings.CacheLocationKey) { if err := f.bridge.MigrateCache(f.settings.Get(settings.CacheLocationKey), _diskCachePath); err != nil {
return f.log.WithError(err).Error("The local cache location could not be changed.")
f.qml.CacheCantMove()
return
}
f.settings.Set(settings.CacheLocationKey, _diskCachePath)
} }
if err := f.bridge.MigrateCache(f.settings.Get(settings.CacheLocationKey), _diskCachePath); err != nil {
f.log.WithError(err).Error("The local cache location could not be changed.")
f.qml.CacheCantMove()
return
}
f.settings.Set(settings.CacheLocationKey, _diskCachePath)
f.qml.CacheLocationChangeSuccess() f.qml.CacheLocationChangeSuccess()
f.restart() f.restart()
} }

View File

@ -18,6 +18,8 @@
package store package store
import ( import (
"context"
"github.com/ProtonMail/gopenpgp/v2/crypto" "github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/ProtonMail/proton-bridge/pkg/message" "github.com/ProtonMail/proton-bridge/pkg/message"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -59,7 +61,7 @@ func (store *Store) UnlockCache(kr *crypto.KeyRing) error {
return err return err
} }
store.cacher.start() store.msgCachePool.start()
return nil return nil
} }
@ -112,7 +114,7 @@ func (store *Store) getCachedMessage(messageID string) ([]byte, error) {
return store.cache.Get(store.user.ID(), messageID) return store.cache.Get(store.user.ID(), messageID)
} }
job, done := store.newBuildJob(messageID, message.ForegroundPriority) job, done := store.newBuildJob(context.Background(), messageID, message.ForegroundPriority)
defer done() defer done()
literal, err := job.GetResult() literal, err := job.GetResult()
@ -135,11 +137,11 @@ func (store *Store) IsCached(messageID string) bool {
// BuildAndCacheMessage builds the given message (with background priority) and puts it in the cache. // BuildAndCacheMessage builds the given message (with background priority) and puts it in the cache.
// It builds with background priority. // It builds with background priority.
func (store *Store) BuildAndCacheMessage(messageID string) error { func (store *Store) BuildAndCacheMessage(ctx context.Context, messageID string) error {
buildAndCacheJobs <- struct{}{} buildAndCacheJobs <- struct{}{}
defer func() { <-buildAndCacheJobs }() defer func() { <-buildAndCacheJobs }()
job, done := store.newBuildJob(messageID, message.BackgroundPriority) job, done := store.newBuildJob(ctx, messageID, message.BackgroundPriority)
defer done() defer done()
literal, err := job.GetResult() literal, err := job.GetResult()

View File

@ -34,6 +34,12 @@ import (
var ErrLowSpace = errors.New("not enough free space left on device") var ErrLowSpace = errors.New("not enough free space left on device")
// IsOnDiskCache will return true if Cache is type of onDiskCache.
func IsOnDiskCache(c Cache) bool {
_, ok := c.(*onDiskCache)
return ok
}
type onDiskCache struct { type onDiskCache struct {
path string path string
opts Options opts Options

View File

@ -17,14 +17,27 @@
package store package store
import "time" import (
"context"
"time"
"github.com/ProtonMail/proton-bridge/internal/store/cache"
)
func (store *Store) StartWatcher() { func (store *Store) StartWatcher() {
if !cache.IsOnDiskCache(store.cache) {
return
}
store.done = make(chan struct{}) store.done = make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
store.msgCachePool.ctx = ctx
go func() { go func() {
ticker := time.NewTicker(10 * time.Minute) ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop() defer ticker.Stop()
defer cancel()
for { for {
// NOTE(GODT-1158): Race condition here? What if DB was already closed? // NOTE(GODT-1158): Race condition here? What if DB was already closed?
@ -35,7 +48,7 @@ func (store *Store) StartWatcher() {
for _, messageID := range messageIDs { for _, messageID := range messageIDs {
if !store.IsCached(messageID) { if !store.IsCached(messageID) {
store.cacher.newJob(messageID) store.msgCachePool.newJob(messageID)
} }
} }

View File

@ -18,35 +18,38 @@
package store package store
import ( import (
"context"
"sync" "sync"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
type Cacher struct { type MsgCachePool struct {
storer Storer storer Storer
jobs chan string jobs chan string
done chan struct{} done chan struct{}
started bool started bool
wg *sync.WaitGroup wg *sync.WaitGroup
ctx context.Context
} }
type Storer interface { type Storer interface {
IsCached(messageID string) bool IsCached(messageID string) bool
BuildAndCacheMessage(messageID string) error BuildAndCacheMessage(ctx context.Context, messageID string) error
} }
func newCacher(storer Storer) *Cacher { func newMsgCachePool(storer Storer) *MsgCachePool {
return &Cacher{ return &MsgCachePool{
storer: storer, storer: storer,
jobs: make(chan string), jobs: make(chan string),
done: make(chan struct{}), done: make(chan struct{}),
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
ctx: context.Background(),
} }
} }
// newJob sends a new job to the cacher if it's running. // newJob sends a new job to the cacher if it's running.
func (cacher *Cacher) newJob(messageID string) { func (cacher *MsgCachePool) newJob(messageID string) {
if !cacher.started { if !cacher.started {
return return
} }
@ -63,7 +66,7 @@ func (cacher *Cacher) newJob(messageID string) {
} }
} }
func (cacher *Cacher) start() { func (cacher *MsgCachePool) start() {
cacher.started = true cacher.started = true
go func() { go func() {
@ -79,17 +82,17 @@ func (cacher *Cacher) start() {
}() }()
} }
func (cacher *Cacher) handleJob(messageID string) { func (cacher *MsgCachePool) handleJob(messageID string) {
defer cacher.wg.Done() defer cacher.wg.Done()
if err := cacher.storer.BuildAndCacheMessage(messageID); err != nil { if err := cacher.storer.BuildAndCacheMessage(cacher.ctx, messageID); err != nil {
logrus.WithError(err).Error("Failed to build and cache message") logrus.WithError(err).Error("Failed to build and cache message")
} else { } else {
logrus.WithField("messageID", messageID).Trace("Message cached") logrus.WithField("messageID", messageID).Trace("Message cached")
} }
} }
func (cacher *Cacher) stop() { func (cacher *MsgCachePool) stop() {
cacher.started = false cacher.started = false
cacher.wg.Wait() cacher.wg.Wait()

View File

@ -25,7 +25,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
func withTestCacher(t *testing.T, doTest func(storer *storemocks.MockStorer, cacher *Cacher)) { func withTestCacher(t *testing.T, doTest func(storer *storemocks.MockStorer, cacher *MsgCachePool)) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
@ -33,7 +33,7 @@ func withTestCacher(t *testing.T, doTest func(storer *storemocks.MockStorer, cac
storer := storemocks.NewMockStorer(ctrl) storer := storemocks.NewMockStorer(ctrl)
// Create a new cacher pointing to the fake store. // Create a new cacher pointing to the fake store.
cacher := newCacher(storer) cacher := newMsgCachePool(storer)
// Start the cacher and wait for it to stop. // Start the cacher and wait for it to stop.
cacher.start() cacher.start()
@ -44,16 +44,16 @@ func withTestCacher(t *testing.T, doTest func(storer *storemocks.MockStorer, cac
func TestCacher(t *testing.T) { func TestCacher(t *testing.T) {
// If the message is not yet cached, we should expect to try to build and cache it. // If the message is not yet cached, we should expect to try to build and cache it.
withTestCacher(t, func(storer *storemocks.MockStorer, cacher *Cacher) { withTestCacher(t, func(storer *storemocks.MockStorer, cacher *MsgCachePool) {
storer.EXPECT().IsCached("messageID").Return(false) storer.EXPECT().IsCached("messageID").Return(false)
storer.EXPECT().BuildAndCacheMessage("messageID").Return(nil) storer.EXPECT().BuildAndCacheMessage(cacher.ctx, "messageID").Return(nil)
cacher.newJob("messageID") cacher.newJob("messageID")
}) })
} }
func TestCacherAlreadyCached(t *testing.T) { func TestCacherAlreadyCached(t *testing.T) {
// If the message is already cached, we should not try to build it. // If the message is already cached, we should not try to build it.
withTestCacher(t, func(storer *storemocks.MockStorer, cacher *Cacher) { withTestCacher(t, func(storer *storemocks.MockStorer, cacher *MsgCachePool) {
storer.EXPECT().IsCached("messageID").Return(true) storer.EXPECT().IsCached("messageID").Return(true)
cacher.newJob("messageID") cacher.newJob("messageID")
}) })
@ -61,9 +61,9 @@ func TestCacherAlreadyCached(t *testing.T) {
func TestCacherFail(t *testing.T) { func TestCacherFail(t *testing.T) {
// If building the message fails, we should not try to cache it. // If building the message fails, we should not try to cache it.
withTestCacher(t, func(storer *storemocks.MockStorer, cacher *Cacher) { withTestCacher(t, func(storer *storemocks.MockStorer, cacher *MsgCachePool) {
storer.EXPECT().IsCached("messageID").Return(false) storer.EXPECT().IsCached("messageID").Return(false)
storer.EXPECT().BuildAndCacheMessage("messageID").Return(errors.New("failed to build message")) storer.EXPECT().BuildAndCacheMessage(cacher.ctx, "messageID").Return(errors.New("failed to build message"))
cacher.newJob("messageID") cacher.newJob("messageID")
}) })
} }
@ -76,14 +76,14 @@ func TestCacherStop(t *testing.T) {
storer := storemocks.NewMockStorer(ctrl) storer := storemocks.NewMockStorer(ctrl)
// Create a new cacher pointing to the fake store. // Create a new cacher pointing to the fake store.
cacher := newCacher(storer) cacher := newMsgCachePool(storer)
// Start the cacher. // Start the cacher.
cacher.start() cacher.start()
// Send a job -- this should succeed. // Send a job -- this should succeed.
storer.EXPECT().IsCached("messageID").Return(false) storer.EXPECT().IsCached("messageID").Return(false)
storer.EXPECT().BuildAndCacheMessage("messageID").Return(nil) storer.EXPECT().BuildAndCacheMessage(cacher.ctx, "messageID").Return(nil)
cacher.newJob("messageID") cacher.newJob("messageID")
// Stop the cacher. // Stop the cacher.

View File

@ -355,17 +355,17 @@ func (m *MockStorer) EXPECT() *MockStorerMockRecorder {
} }
// BuildAndCacheMessage mocks base method. // BuildAndCacheMessage mocks base method.
func (m *MockStorer) BuildAndCacheMessage(arg0 string) error { func (m *MockStorer) BuildAndCacheMessage(arg0 context.Context, arg1 string) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BuildAndCacheMessage", arg0) ret := m.ctrl.Call(m, "BuildAndCacheMessage", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// BuildAndCacheMessage indicates an expected call of BuildAndCacheMessage. // BuildAndCacheMessage indicates an expected call of BuildAndCacheMessage.
func (mr *MockStorerMockRecorder) BuildAndCacheMessage(arg0 interface{}) *gomock.Call { func (mr *MockStorerMockRecorder) BuildAndCacheMessage(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildAndCacheMessage", reflect.TypeOf((*MockStorer)(nil).BuildAndCacheMessage), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildAndCacheMessage", reflect.TypeOf((*MockStorer)(nil).BuildAndCacheMessage), arg0, arg1)
} }
// IsCached mocks base method. // IsCached mocks base method.

View File

@ -135,10 +135,10 @@ type Store struct {
addresses map[string]*Address addresses map[string]*Address
notifier ChangeNotifier notifier ChangeNotifier
builder *message.Builder builder *message.Builder
cache cache.Cache cache cache.Cache
cacher *Cacher msgCachePool *MsgCachePool
done chan struct{} done chan struct{}
isSyncRunning bool isSyncRunning bool
syncCooldown cooldown syncCooldown cooldown
@ -194,7 +194,7 @@ func New( // nolint[funlen]
// Create a new cacher. It's not started yet. // Create a new cacher. It's not started yet.
// NOTE(GODT-1158): I hate this circular dependency store->cacher->store :( // NOTE(GODT-1158): I hate this circular dependency store->cacher->store :(
store.cacher = newCacher(store) store.msgCachePool = newMsgCachePool(store)
// Minimal increase is event pollInterval, doubles every failed retry up to 5 minutes. // Minimal increase is event pollInterval, doubles every failed retry up to 5 minutes.
store.syncCooldown.setExponentialWait(pollInterval, 2, 5*time.Minute) store.syncCooldown.setExponentialWait(pollInterval, 2, 5*time.Minute)
@ -388,9 +388,9 @@ func (store *Store) addAddress(address, addressID string, labels []*pmapi.Label)
} }
// newBuildJob returns a new build job for the given message using the store's message builder. // newBuildJob returns a new build job for the given message using the store's message builder.
func (store *Store) newBuildJob(messageID string, priority int) (*message.Job, pool.DoneFunc) { func (store *Store) newBuildJob(ctx context.Context, messageID string, priority int) (*message.Job, pool.DoneFunc) {
return store.builder.NewJobWithOptions( return store.builder.NewJobWithOptions(
context.Background(), ctx,
store.client(), store.client(),
messageID, messageID,
message.JobOptions{ message.JobOptions{
@ -421,7 +421,7 @@ func (store *Store) CloseEventLoopAndCacher() {
store.stopWatcher() store.stopWatcher()
store.cacher.stop() store.msgCachePool.stop()
} }
func (store *Store) close() error { func (store *Store) close() error {
@ -457,6 +457,8 @@ func (store *Store) Remove() error {
} }
func (store *Store) RemoveCache() error { func (store *Store) RemoveCache() error {
store.stopWatcher()
if err := store.clearCachePassphrase(); err != nil { if err := store.clearCachePassphrase(); err != nil {
logrus.WithError(err).Error("Failed to clear cache passphrase") logrus.WithError(err).Error("Failed to clear cache passphrase")
} }

View File

@ -304,7 +304,7 @@ func (store *Store) createOrUpdateMessagesEvent(msgs []*pmapi.Message) error { /
// Notify the cacher that it should start caching messages. // Notify the cacher that it should start caching messages.
for _, msg := range msgs { for _, msg := range msgs {
store.cacher.newJob(msg.ID) store.msgCachePool.newJob(msg.ID)
} }
return nil return nil

View File

@ -188,8 +188,6 @@ func (u *Users) EnableCache() error {
} }
func (u *Users) DisableCache() error { func (u *Users) DisableCache() error {
// NOTE(GODT-1158): Is it an error if we can't remove a user's cache?
for _, user := range u.users { for _, user := range u.users {
if err := user.store.RemoveCache(); err != nil { if err := user.store.RemoveCache(); err != nil {
logrus.WithError(err).Error("Failed to remove user's message cache") logrus.WithError(err).Error("Failed to remove user's message cache")
@ -202,24 +200,22 @@ func (u *Users) DisableCache() error {
// MigrateCache moves the message cache folder from folder srcPath to folder dstPath. // MigrateCache moves the message cache folder from folder srcPath to folder dstPath.
// srcPath must point to an existing folder. dstPath must be an empty folder or not exist. // srcPath must point to an existing folder. dstPath must be an empty folder or not exist.
func (u *Users) MigrateCache(srcPath, dstPath string) error { func (u *Users) MigrateCache(srcPath, dstPath string) error {
// NOTE(GODT-1158): Is it enough dstPath just close the store? Do we need dstPath force-close the cacher too?
fiSrc, err := os.Stat(srcPath) fiSrc, err := os.Stat(srcPath)
if os.IsNotExist(err) { if os.IsNotExist(err) {
logrus.WithError(err).Error("unknown source for cache migration") logrus.WithError(err).Warn("Skipping migration: Unknown source for cache migration")
return err return nil
} }
if !fiSrc.IsDir() { if !fiSrc.IsDir() {
logrus.WithError(err).Error("cache migration cannot be perform srcPath a file") logrus.WithError(err).Warn("Skipping migration: srcPath is not a dir")
return err return nil
} }
if isSubfolderOf(dstPath, srcPath) { if isSubfolderOf(dstPath, srcPath) {
return errors.New("the destination folder is a subfolder of the source folder") return errors.New("destination folder is a subfolder of the source folder")
} }
if err = checkFolderIsSuitableDestinationForCache(dstPath); err != nil { if err = checkFolderIsSuitableDestinationForCache(dstPath); err != nil {
logrus.WithError(err).Error("destination folder is not suitable for cache migration") logrus.WithError(err).Error("The destination folder is not suitable for cache migration")
return err return err
} }

View File

@ -120,7 +120,6 @@ func (u *User) connect(client pmapi.Client, creds *credentials.Credentials) erro
return err return err
} }
// NOTE(GODT-1158): If using in-memory cache we probably shouldn't start the watcher?
u.store.StartWatcher() u.store.StartWatcher()
} }

View File

@ -18,6 +18,7 @@
package pmapi package pmapi
import ( import (
"context"
"math/rand" "math/rand"
"net/http" "net/http"
"strconv" "strconv"
@ -141,6 +142,10 @@ func wrapNoConnection(res *resty.Response, err error) (*resty.Response, error) {
return res, err return res, err
} }
if errors.Is(err, context.Canceled) {
return res, err
}
if res.RawResponse != nil { if res.RawResponse != nil {
return res, err return res, err
} }