forked from Silverfish/proton-bridge
GODT-1077 IMAP sync counting
This commit is contained in:
@ -226,7 +226,7 @@ func (im *imapMailbox) importMessage(m *pmapi.Message, readers []io.Reader, kr *
|
|||||||
return im.storeMailbox.ImportMessage(m, body, labels)
|
return im.storeMailbox.ImportMessage(m, body, labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (im *imapMailbox) getMessage(storeMessage storeMessageProvider, items []imap.FetchItem) (msg *imap.Message, err error) {
|
func (im *imapMailbox) getMessage(storeMessage storeMessageProvider, items []imap.FetchItem, msgBuildCountHistogram *msgBuildCountHistogram) (msg *imap.Message, err error) { //nolint[funlen]
|
||||||
im.log.WithField("msgID", storeMessage.ID()).Trace("Getting message")
|
im.log.WithField("msgID", storeMessage.ID()).Trace("Getting message")
|
||||||
|
|
||||||
seqNum, err := storeMessage.SequenceNumber()
|
seqNum, err := storeMessage.SequenceNumber()
|
||||||
@ -268,7 +268,12 @@ func (im *imapMailbox) getMessage(storeMessage storeMessageProvider, items []ima
|
|||||||
// on our part and we need to compute "real" size of decrypted data.
|
// on our part and we need to compute "real" size of decrypted data.
|
||||||
if m.Size <= 0 {
|
if m.Size <= 0 {
|
||||||
im.log.WithField("msgID", storeMessage.ID()).Trace("Size unknown - downloading body")
|
im.log.WithField("msgID", storeMessage.ID()).Trace("Size unknown - downloading body")
|
||||||
if _, _, err = im.getBodyAndStructure(storeMessage); err != nil {
|
// We are sure the size is not a problem right now. Clients
|
||||||
|
// might not first check sizes of all messages so we couldn't
|
||||||
|
// be sure if seeing 1st or 2nd sync is all right or not.
|
||||||
|
// Therefore, it's better to exclude getting size from the
|
||||||
|
// counting and see build count as real message build.
|
||||||
|
if _, _, err = im.getBodyAndStructure(storeMessage, nil); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -279,7 +284,7 @@ func (im *imapMailbox) getMessage(storeMessage storeMessageProvider, items []ima
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
if err = im.getLiteralForSection(item, msg, storeMessage); err != nil {
|
if err = im.getLiteralForSection(item, msg, storeMessage, msgBuildCountHistogram); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -288,14 +293,14 @@ func (im *imapMailbox) getMessage(storeMessage storeMessageProvider, items []ima
|
|||||||
return msg, err
|
return msg, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (im *imapMailbox) getLiteralForSection(itemSection imap.FetchItem, msg *imap.Message, storeMessage storeMessageProvider) error {
|
func (im *imapMailbox) getLiteralForSection(itemSection imap.FetchItem, msg *imap.Message, storeMessage storeMessageProvider, msgBuildCountHistogram *msgBuildCountHistogram) error {
|
||||||
section, err := imap.ParseBodySectionName(itemSection)
|
section, err := imap.ParseBodySectionName(itemSection)
|
||||||
if err != nil { // Ignore error
|
if err != nil { // Ignore error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var literal imap.Literal
|
var literal imap.Literal
|
||||||
if literal, err = im.getMessageBodySection(storeMessage, section); err != nil {
|
if literal, err = im.getMessageBodySection(storeMessage, section, msgBuildCountHistogram); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -313,14 +318,19 @@ func (im *imapMailbox) getBodyStructure(storeMessage storeMessageProvider) (bs *
|
|||||||
im.log.WithError(err).Debug("Fail to retrieve bodystructure from database")
|
im.log.WithError(err).Debug("Fail to retrieve bodystructure from database")
|
||||||
}
|
}
|
||||||
if bs == nil {
|
if bs == nil {
|
||||||
if bs, _, err = im.getBodyAndStructure(storeMessage); err != nil {
|
// We are sure the body structure is not a problem right now.
|
||||||
|
// Clients might do first fetch body structure so we couldn't
|
||||||
|
// be sure if seeing 1st or 2nd sync is all right or not.
|
||||||
|
// Therefore, it's better to exclude first body structure fetch
|
||||||
|
// from the counting and see build count as real message build.
|
||||||
|
if bs, _, err = im.getBodyAndStructure(storeMessage, nil); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (im *imapMailbox) getBodyAndStructure(storeMessage storeMessageProvider) (
|
func (im *imapMailbox) getBodyAndStructure(storeMessage storeMessageProvider, msgBuildCountHistogram *msgBuildCountHistogram) (
|
||||||
structure *message.BodyStructure,
|
structure *message.BodyStructure,
|
||||||
bodyReader *bytes.Reader, err error,
|
bodyReader *bytes.Reader, err error,
|
||||||
) {
|
) {
|
||||||
@ -352,6 +362,15 @@ func (im *imapMailbox) getBodyAndStructure(storeMessage storeMessageProvider) (
|
|||||||
WithField("msgID", m.ID).
|
WithField("msgID", m.ID).
|
||||||
Warn("Cannot update header while building")
|
Warn("Cannot update header while building")
|
||||||
}
|
}
|
||||||
|
if msgBuildCountHistogram != nil {
|
||||||
|
times, err := storeMessage.IncreaseBuildCount()
|
||||||
|
if err != nil {
|
||||||
|
im.log.WithError(err).
|
||||||
|
WithField("msgID", m.ID).
|
||||||
|
Warn("Cannot increase build count")
|
||||||
|
}
|
||||||
|
msgBuildCountHistogram.add(times)
|
||||||
|
}
|
||||||
// Drafts can change and we don't want to cache them.
|
// Drafts can change and we don't want to cache them.
|
||||||
if !isMessageInDraftFolder(m) {
|
if !isMessageInDraftFolder(m) {
|
||||||
cache.SaveMail(id, body, structure)
|
cache.SaveMail(id, body, structure)
|
||||||
@ -379,7 +398,7 @@ func isMessageInDraftFolder(m *pmapi.Message) bool {
|
|||||||
|
|
||||||
// This will download message (or read from cache) and pick up the section,
|
// This will download message (or read from cache) and pick up the section,
|
||||||
// extract data (header,body, both) and trim the output if needed.
|
// extract data (header,body, both) and trim the output if needed.
|
||||||
func (im *imapMailbox) getMessageBodySection(storeMessage storeMessageProvider, section *imap.BodySectionName) (literal imap.Literal, err error) { // nolint[funlen]
|
func (im *imapMailbox) getMessageBodySection(storeMessage storeMessageProvider, section *imap.BodySectionName, msgBuildCountHistogram *msgBuildCountHistogram) (literal imap.Literal, err error) { // nolint[funlen]
|
||||||
var (
|
var (
|
||||||
structure *message.BodyStructure
|
structure *message.BodyStructure
|
||||||
bodyReader *bytes.Reader
|
bodyReader *bytes.Reader
|
||||||
@ -410,7 +429,7 @@ func (im *imapMailbox) getMessageBodySection(storeMessage storeMessageProvider,
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// The rest of cases need download and decrypt.
|
// The rest of cases need download and decrypt.
|
||||||
structure, bodyReader, err = im.getBodyAndStructure(storeMessage)
|
structure, bodyReader, err = im.getBodyAndStructure(storeMessage, msgBuildCountHistogram)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@ -482,12 +482,13 @@ func (im *imapMailbox) SearchMessages(isUID bool, criteria *imap.SearchCriteria)
|
|||||||
//
|
//
|
||||||
// Messages must be sent to msgResponse. When the function returns, msgResponse must be closed.
|
// Messages must be sent to msgResponse. When the function returns, msgResponse must be closed.
|
||||||
func (im *imapMailbox) ListMessages(isUID bool, seqSet *imap.SeqSet, items []imap.FetchItem, msgResponse chan<- *imap.Message) error {
|
func (im *imapMailbox) ListMessages(isUID bool, seqSet *imap.SeqSet, items []imap.FetchItem, msgResponse chan<- *imap.Message) error {
|
||||||
|
msgBuildCountHistogram := newMsgBuildCountHistogram()
|
||||||
return im.logCommand(func() error {
|
return im.logCommand(func() error {
|
||||||
return im.listMessages(isUID, seqSet, items, msgResponse)
|
return im.listMessages(isUID, seqSet, items, msgResponse, msgBuildCountHistogram)
|
||||||
}, "FETCH", isUID, seqSet, items)
|
}, "FETCH", isUID, seqSet, items, msgBuildCountHistogram)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (im *imapMailbox) listMessages(isUID bool, seqSet *imap.SeqSet, items []imap.FetchItem, msgResponse chan<- *imap.Message) (err error) { //nolint[funlen]
|
func (im *imapMailbox) listMessages(isUID bool, seqSet *imap.SeqSet, items []imap.FetchItem, msgResponse chan<- *imap.Message, msgBuildCountHistogram *msgBuildCountHistogram) (err error) { //nolint[funlen]
|
||||||
defer func() {
|
defer func() {
|
||||||
close(msgResponse)
|
close(msgResponse)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -544,7 +545,7 @@ func (im *imapMailbox) listMessages(isUID bool, seqSet *imap.SeqSet, items []ima
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
msg, err := im.getMessage(storeMessage, items)
|
msg, err := im.getMessage(storeMessage, items, msgBuildCountHistogram)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("list message build: %v", err)
|
err = fmt.Errorf("list message build: %v", err)
|
||||||
l.WithField("metaID", storeMessage.ID()).Error(err)
|
l.WithField("metaID", storeMessage.ID()).Error(err)
|
||||||
|
|||||||
65
internal/imap/msg_build_counts.go
Normal file
65
internal/imap/msg_build_counts.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
// Copyright (c) 2021 Proton Technologies AG
|
||||||
|
//
|
||||||
|
// This file is part of ProtonMail Bridge.
|
||||||
|
//
|
||||||
|
// ProtonMail Bridge is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// ProtonMail Bridge is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with ProtonMail Bridge. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package imap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// msgBuildCountHistogram is used to analyse and log the number of repetitive
|
||||||
|
// downloads of requested messages per one fetch. The number of builds per each
|
||||||
|
// messageID is stored in persistent database. The msgBuildCountHistogram will
|
||||||
|
// take this number for each message in ongoing fetch and create histogram of
|
||||||
|
// repeats.
|
||||||
|
//
|
||||||
|
// Example: During `fetch 1:300` there were
|
||||||
|
// - 100 messages were downloaded first time
|
||||||
|
// - 100 messages were downloaded second time
|
||||||
|
// - 99 messages were downloaded 10th times
|
||||||
|
// - 1 messages were downloaded 100th times
|
||||||
|
type msgBuildCountHistogram struct {
|
||||||
|
// Key represents how many times message was build.
|
||||||
|
// Value stores how many messages are build X times based on the key.
|
||||||
|
counts map[uint32]uint32
|
||||||
|
lock sync.Locker
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMsgBuildCountHistogram() *msgBuildCountHistogram {
|
||||||
|
return &msgBuildCountHistogram{
|
||||||
|
counts: map[uint32]uint32{},
|
||||||
|
lock: &sync.Mutex{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *msgBuildCountHistogram) String() string {
|
||||||
|
res := ""
|
||||||
|
for nRebuild, counts := range c.counts {
|
||||||
|
if res != "" {
|
||||||
|
res += ", "
|
||||||
|
}
|
||||||
|
res += fmt.Sprintf("[%d]:%d", nRebuild, counts)
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *msgBuildCountHistogram) add(nRebuild uint32) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
c.counts[nRebuild]++
|
||||||
|
}
|
||||||
@ -103,6 +103,7 @@ type storeMessageProvider interface {
|
|||||||
SetContentTypeAndHeader(string, mail.Header) error
|
SetContentTypeAndHeader(string, mail.Header) error
|
||||||
SetBodyStructure(*pkgMsg.BodyStructure) error
|
SetBodyStructure(*pkgMsg.BodyStructure) error
|
||||||
GetBodyStructure() (*pkgMsg.BodyStructure, error)
|
GetBodyStructure() (*pkgMsg.BodyStructure, error)
|
||||||
|
IncreaseBuildCount() (uint32, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type storeUserWrap struct {
|
type storeUserWrap struct {
|
||||||
|
|||||||
@ -148,3 +148,17 @@ func (message *Message) GetBodyStructure() (bs *pkgMsg.BodyStructure, err error)
|
|||||||
}
|
}
|
||||||
return bs, nil
|
return bs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (message *Message) IncreaseBuildCount() (times uint32, err error) {
|
||||||
|
txUpdate := func(tx *bolt.Tx) error {
|
||||||
|
times, err = message.store.txIncreaseMsgBuildCount(
|
||||||
|
tx.Bucket(msgBuildCountBucket),
|
||||||
|
message.ID(),
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = message.store.db.Update(txUpdate); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return times, nil
|
||||||
|
}
|
||||||
|
|||||||
@ -54,6 +54,8 @@ var (
|
|||||||
// * {messageID} -> message data (subject, from, to, time, headers, body size, ...)
|
// * {messageID} -> message data (subject, from, to, time, headers, body size, ...)
|
||||||
// * bodystructure
|
// * bodystructure
|
||||||
// * {messageID} -> message body structure
|
// * {messageID} -> message body structure
|
||||||
|
// * msgbuildcount
|
||||||
|
// * {messageID} -> uint32 number of message builds to track re-sync issues
|
||||||
// * counts
|
// * counts
|
||||||
// * {mailboxID} -> mailboxCounts: totalOnAPI, unreadOnAPI, labelName, labelColor, labelIsExclusive
|
// * {mailboxID} -> mailboxCounts: totalOnAPI, unreadOnAPI, labelName, labelColor, labelIsExclusive
|
||||||
// * address_info
|
// * address_info
|
||||||
@ -76,6 +78,7 @@ var (
|
|||||||
// * {messageID} -> true
|
// * {messageID} -> true
|
||||||
metadataBucket = []byte("metadata") //nolint[gochecknoglobals]
|
metadataBucket = []byte("metadata") //nolint[gochecknoglobals]
|
||||||
bodystructureBucket = []byte("bodystructure") //nolint[gochecknoglobals]
|
bodystructureBucket = []byte("bodystructure") //nolint[gochecknoglobals]
|
||||||
|
msgBuildCountBucket = []byte("msgbuildcount") //nolint[gochecknoglobals]
|
||||||
countsBucket = []byte("counts") //nolint[gochecknoglobals]
|
countsBucket = []byte("counts") //nolint[gochecknoglobals]
|
||||||
addressInfoBucket = []byte("address_info") //nolint[gochecknoglobals]
|
addressInfoBucket = []byte("address_info") //nolint[gochecknoglobals]
|
||||||
addressModeBucket = []byte("address_mode") //nolint[gochecknoglobals]
|
addressModeBucket = []byte("address_mode") //nolint[gochecknoglobals]
|
||||||
@ -204,6 +207,10 @@ func openBoltDatabase(filePath string) (db *bolt.DB, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, err = tx.CreateBucketIfNotExists(msgBuildCountBucket); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if _, err = tx.CreateBucketIfNotExists(countsBucket); err != nil {
|
if _, err = tx.CreateBucketIfNotExists(countsBucket); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@ -191,6 +191,19 @@ func (store *Store) txGetBodyStructure(bsBucket *bolt.Bucket, msgID string) (*pk
|
|||||||
return pkgMsg.DeserializeBodyStructure(raw)
|
return pkgMsg.DeserializeBodyStructure(raw)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (store *Store) txIncreaseMsgBuildCount(b *bolt.Bucket, msgID string) (uint32, error) {
|
||||||
|
key := []byte(msgID)
|
||||||
|
count := uint32(0)
|
||||||
|
|
||||||
|
raw := b.Get(key)
|
||||||
|
if raw != nil {
|
||||||
|
count = btoi(raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
|
return count, b.Put(key, itob(count))
|
||||||
|
}
|
||||||
|
|
||||||
// createOrUpdateMessageEvent is helper to create only one message with
|
// createOrUpdateMessageEvent is helper to create only one message with
|
||||||
// createOrUpdateMessagesEvent.
|
// createOrUpdateMessagesEvent.
|
||||||
func (store *Store) createOrUpdateMessageEvent(msg *pmapi.Message) error {
|
func (store *Store) createOrUpdateMessageEvent(msg *pmapi.Message) error {
|
||||||
|
|||||||
Reference in New Issue
Block a user