LCOV - code coverage report
Current view: top level - src - wal.cc (source / functions) Hit Total Coverage
Test: coverage.info.cleaned Lines: 497 505 98.4 %
Date: 2015-01-12 15:17:13 Functions: 36 36 100.0 %
Branches: 208 244 85.2 %

           Branch data     Line data    Source code
       1                 :            : /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
       2                 :            : /*
       3                 :            :  *     Copyright 2010 Couchbase, Inc
       4                 :            :  *
       5                 :            :  *   Licensed under the Apache License, Version 2.0 (the "License");
       6                 :            :  *   you may not use this file except in compliance with the License.
       7                 :            :  *   You may obtain a copy of the License at
       8                 :            :  *
       9                 :            :  *       http://www.apache.org/licenses/LICENSE-2.0
      10                 :            :  *
      11                 :            :  *   Unless required by applicable law or agreed to in writing, software
      12                 :            :  *   distributed under the License is distributed on an "AS IS" BASIS,
      13                 :            :  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      14                 :            :  *   See the License for the specific language governing permissions and
      15                 :            :  *   limitations under the License.
      16                 :            :  */
      17                 :            : 
      18                 :            : #include <stdlib.h>
      19                 :            : #include <string.h>
      20                 :            : #include <assert.h>
      21                 :            : #include <stdint.h>
      22                 :            : 
      23                 :            : #include "filemgr.h"
      24                 :            : #include "common.h"
      25                 :            : #include "hash.h"
      26                 :            : #include "docio.h"
      27                 :            : #include "wal.h"
      28                 :            : #include "hash_functions.h"
      29                 :            : #include "fdb_internal.h"
      30                 :            : 
      31                 :            : #include "memleak.h"
      32                 :            : 
      33                 :            : #ifdef __DEBUG
      34                 :            : #ifndef __DEBUG_WAL
      35                 :            :     #undef DBG
      36                 :            :     #undef DBGCMD
      37                 :            :     #undef DBGSW
      38                 :            :     #define DBG(...)
      39                 :            :     #define DBGCMD(...)
      40                 :            :     #define DBGSW(n, ...)
      41                 :            : #endif
      42                 :            : #endif
      43                 :            : 
      44                 :   13952686 : INLINE uint32_t _wal_hash_bykey(struct hash *hash, struct hash_elem *e)
      45                 :            : {
      46                 :   13952686 :     struct wal_item_header *item = _get_entry(e, struct wal_item_header, he_key);
      47                 :   13952686 :     return chksum((uint8_t*)item->key, item->keylen) & ((uint64_t)hash->nbuckets - 1);
      48                 :            : }
      49                 :            : 
      50                 :   25607864 : INLINE int _wal_cmp_bykey(struct hash_elem *a, struct hash_elem *b)
      51                 :            : {
      52                 :            :     struct wal_item_header *aa, *bb;
      53                 :   25607864 :     aa = _get_entry(a, struct wal_item_header, he_key);
      54                 :   25607864 :     bb = _get_entry(b, struct wal_item_header, he_key);
      55                 :            : 
      56         [ +  + ]:   25607864 :     if (aa->keylen == bb->keylen) return memcmp(aa->key, bb->key, aa->keylen);
      57                 :            :     else {
      58         [ +  + ]:       3868 :         size_t len = MIN(aa->keylen , bb->keylen);
      59                 :       3868 :         int cmp = memcmp(aa->key, bb->key, len);
      60         [ +  - ]:       3868 :         if (cmp != 0) return cmp;
      61                 :            :         else {
      62                 :   25607864 :             return (int)((int)aa->keylen - (int)bb->keylen);
      63                 :            :         }
      64                 :            :     }
      65                 :            : }
      66                 :            : 
      67                 :    8587157 : INLINE uint32_t _wal_hash_byseq(struct hash *hash, struct hash_elem *e)
      68                 :            : {
      69                 :    8587157 :     struct wal_item *item = _get_entry(e, struct wal_item, he_seq);
      70                 :    8587157 :     return (item->seqnum) & ((uint64_t)hash->nbuckets - 1);
      71                 :            : }
      72                 :            : 
      73                 :   16310856 : INLINE int _wal_cmp_byseq(struct hash_elem *a, struct hash_elem *b)
      74                 :            : {
      75                 :            :     struct wal_item *aa, *bb;
      76                 :   16310856 :     aa = _get_entry(a, struct wal_item, he_seq);
      77                 :   16310856 :     bb = _get_entry(b, struct wal_item, he_seq);
      78                 :            : 
      79         [ +  + ]:   16310856 :     if (aa->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
      80                 :            :         // multi KV instance mode
      81                 :            :         fdb_kvs_id_t *id_a, *id_b;
      82                 :            :         fdb_kvs_id_t id_aa, id_bb;
      83                 :            :         // KV ID is stored at the first 8 bytes in the key
      84                 :   16309830 :         id_a = (fdb_kvs_id_t *)aa->header->key;
      85                 :   16309830 :         id_b = (fdb_kvs_id_t *)bb->header->key;
      86                 :   16309830 :         id_aa = _endian_decode(*id_a);
      87                 :   16309830 :         id_bb = _endian_decode(*id_b);
      88         [ +  + ]:   16309830 :         if (id_aa < id_bb) {
      89                 :      26864 :             return -1;
      90         [ +  + ]:   16282966 :         } else if (id_aa > id_bb) {
      91                 :      11814 :             return 1;
      92                 :            :         } else {
      93                 :   16271152 :             return _CMP_U64(aa->seqnum, bb->seqnum);
      94                 :            :         }
      95                 :            :     } else {
      96                 :   16310856 :         return _CMP_U64(aa->seqnum, bb->seqnum);
      97                 :            :     }
      98                 :            : }
      99                 :            : 
     100                 :        248 : fdb_status wal_init(struct filemgr *file, int nbucket)
     101                 :            : {
     102                 :        248 :     file->wal->flag = WAL_FLAG_INITIALIZED;
     103                 :        248 :     file->wal->size = 0;
     104                 :        248 :     file->wal->num_flushable = 0;
     105                 :        248 :     file->wal->datasize = 0;
     106                 :        248 :     file->wal->wal_dirty = FDB_WAL_CLEAN;
     107                 :        248 :     hash_init(&file->wal->hash_bykey, nbucket, _wal_hash_bykey, _wal_cmp_bykey);
     108                 :        248 :     hash_init(&file->wal->hash_byseq, nbucket, _wal_hash_byseq, _wal_cmp_byseq);
     109                 :        248 :     list_init(&file->wal->list);
     110                 :        248 :     list_init(&file->wal->txn_list);
     111                 :        248 :     spin_init(&file->wal->lock);
     112                 :            : 
     113                 :            :     DBG("wal item size %d\n", (int)sizeof(struct wal_item));
     114                 :        248 :     return FDB_RESULT_SUCCESS;
     115                 :            : }
     116                 :            : 
     117                 :        745 : int wal_is_initialized(struct filemgr *file)
     118                 :            : {
     119                 :        745 :     return file->wal->flag & WAL_FLAG_INITIALIZED;
     120                 :            : }
     121                 :            : 
     122                 :    4332511 : static fdb_status _wal_insert(fdb_txn *txn,
     123                 :            :                               struct filemgr *file,
     124                 :            :                               fdb_doc *doc,
     125                 :            :                               uint64_t offset,
     126                 :            :                               int is_compactor)
     127                 :            : {
     128                 :            :     struct wal_item *item;
     129                 :            :     struct wal_item_header query, *header;
     130                 :            :     struct list_elem *le;
     131                 :            :     struct hash_elem *he;
     132                 :    4332511 :     void *key = doc->key;
     133                 :    4332511 :     size_t keylen = doc->keylen;
     134                 :            :     fdb_kvs_id_t kv_id, *_kv_id;
     135                 :            : 
     136         [ +  + ]:    4332511 :     if (file->kv_header) { // multi KV instance mode
     137                 :    4331485 :         _kv_id = (fdb_kvs_id_t*)doc->key;
     138                 :    4331485 :         kv_id = _endian_decode(*_kv_id);
     139                 :            :     } else {
     140                 :       1026 :         kv_id = 0;
     141                 :            :     }
     142                 :    4332511 :     query.key = key;
     143                 :    4332511 :     query.keylen = keylen;
     144                 :            : 
     145                 :    4332511 :     spin_lock(&file->wal->lock);
     146                 :            : 
     147                 :    4332511 :     he = hash_find(&file->wal->hash_bykey, &query.he_key);
     148                 :            : 
     149         [ +  + ]:    4332509 :     if (he) {
     150                 :            :         // already exist .. retrieve header
     151                 :      51604 :         header = _get_entry(he, struct wal_item_header, he_key);
     152                 :            : 
     153                 :            :         // if this entry is inserted by compactor, AND
     154                 :            :         // any other COMMITTED entry for the same key already exists,
     155                 :            :         // then we know that the other entry is inserted by the other writer
     156                 :            :         // after compaction is started.
     157                 :            :         // AND also the other entry is always fresher than
     158                 :            :         // the entry inserted by compactor.
     159                 :            :         // Thus, we ignore the entry by compactor if and only if
     160                 :            :         // there is a committed entry for the same key.
     161         [ +  + ]:      51604 :         if (!is_compactor) {
     162                 :            :             // normal insert .. find uncommitted item belonging to the same txn
     163                 :      12034 :             le = list_begin(&header->items);
     164         [ +  + ]:      24088 :             while (le) {
     165                 :      12054 :                 item = _get_entry(le, struct wal_item, list_elem);
     166                 :            : 
     167 [ +  + ][ +  + ]:      12054 :                 if (item->txn == txn && !(item->flag & WAL_ITEM_COMMITTED)) {
     168                 :        229 :                     item->flag &= ~WAL_ITEM_FLUSH_READY;
     169                 :            : 
     170                 :        229 :                     hash_remove(&file->wal->hash_byseq, &item->he_seq);
     171                 :        229 :                     item->seqnum = doc->seqnum;
     172                 :        229 :                     hash_insert(&file->wal->hash_byseq, &item->he_seq);
     173                 :            : 
     174                 :        229 :                     file->wal->datasize -= item->doc_size;
     175                 :        229 :                     file->wal->datasize += doc->size_ondisk;
     176                 :        229 :                     item->doc_size = doc->size_ondisk;
     177                 :        229 :                     item->offset = offset;
     178         [ +  + ]:        229 :                     item->action = doc->deleted ? WAL_ACT_LOGICAL_REMOVE : WAL_ACT_INSERT;
     179                 :            : 
     180                 :            :                     // move the item to the front of the list (header)
     181                 :        229 :                     list_remove(&header->items, &item->list_elem);
     182                 :        229 :                     list_push_front(&header->items, &item->list_elem);
     183                 :        229 :                     break;
     184                 :            :                 }
     185                 :      11825 :                 le = list_next(le);
     186                 :            :             }
     187                 :            :         } else {
     188                 :            :             // insertion by compactor
     189                 :            :             // check whether there is a committed entry
     190                 :      39570 :             le = list_end(&header->items);
     191         [ +  - ]:      39570 :             if (le) {
     192                 :      39570 :                 item = _get_entry(le, struct wal_item, list_elem);
     193         [ +  + ]:      39570 :                 if (!(item->flag & WAL_ITEM_COMMITTED)) {
     194                 :            :                     // there is no committed entry .. insert the entry from compactor
     195                 :          8 :                     le = NULL;
     196                 :            :                     // increase num_docs
     197                 :            :                     // (if committed entry already exists,
     198                 :            :                     //  num_docs doesn't need to be increased)
     199                 :          8 :                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, 1);
     200                 :            :                 }
     201                 :            :             }
     202                 :            :         }
     203                 :            : 
     204         [ +  + ]:      51604 :         if (le == NULL) {
     205                 :            :             // not exist
     206                 :            :             // create new item
     207                 :      11813 :             item = (struct wal_item *)malloc(sizeof(struct wal_item));
     208         [ +  + ]:      11813 :             if (is_compactor) {
     209                 :          8 :                 item->flag = WAL_ITEM_COMMITTED | WAL_ITEM_BY_COMPACTOR;
     210                 :            :             } else {
     211                 :      11805 :                 item->flag = 0x0;
     212                 :            :             }
     213         [ +  - ]:      11813 :             if (file->kv_header) { // multi KV instance mode
     214                 :      11813 :                 item->flag |= WAL_ITEM_MULTI_KV_INS_MODE;
     215                 :            :             }
     216                 :      11813 :             item->txn = txn;
     217         [ +  + ]:      11813 :             if (txn == &file->global_txn) {
     218                 :       8173 :                 file->wal->num_flushable++;
     219                 :            :             }
     220                 :      11813 :             item->header = header;
     221                 :            : 
     222                 :      11813 :             item->seqnum = doc->seqnum;
     223         [ +  + ]:      11813 :             item->action = doc->deleted ? WAL_ACT_LOGICAL_REMOVE : WAL_ACT_INSERT;
     224                 :      11813 :             item->offset = offset;
     225                 :      11813 :             item->doc_size = doc->size_ondisk;
     226                 :      11813 :             file->wal->datasize += doc->size_ondisk;
     227                 :            : 
     228                 :      11813 :             hash_insert(&file->wal->hash_byseq, &item->he_seq);
     229         [ +  + ]:      11813 :             if (!is_compactor) {
     230                 :            :                 // insert into header's list
     231                 :      11805 :                 list_push_front(&header->items, &item->list_elem);
     232                 :            :                 // also insert into transaction's list
     233                 :      11805 :                 list_push_back(txn->items, &item->list_elem_txn);
     234                 :            :             } else {
     235                 :            :                 // compactor
     236                 :            :                 // always push back because it is already committed
     237                 :          8 :                 list_push_back(&header->items, &item->list_elem);
     238                 :            :             }
     239                 :      11813 :             file->wal->size++;
     240                 :            :         }
     241                 :            :     } else {
     242                 :            :         // not exist .. create new one
     243                 :            :         // create new header and new item
     244                 :    4280905 :         header = (struct wal_item_header*)malloc(sizeof(struct wal_item_header));
     245                 :    4280905 :         list_init(&header->items);
     246                 :    4280907 :         header->keylen = keylen;
     247                 :    4280907 :         header->key = (void *)malloc(header->keylen);
     248                 :    4280907 :         memcpy(header->key, key, header->keylen);
     249                 :    4280907 :         hash_insert(&file->wal->hash_bykey, &header->he_key);
     250                 :            : 
     251                 :    4280906 :         item = (struct wal_item *)malloc(sizeof(struct wal_item));
     252                 :            :         // entries inserted by compactor is already committed
     253         [ +  + ]:    4280906 :         if (is_compactor) {
     254                 :    1840769 :             item->flag = WAL_ITEM_COMMITTED | WAL_ITEM_BY_COMPACTOR;
     255                 :            :         } else {
     256                 :    2440137 :             item->flag = 0x0;
     257                 :            :         }
     258         [ +  + ]:    4280906 :         if (file->kv_header) { // multi KV instance mode
     259                 :    4279880 :             item->flag |= WAL_ITEM_MULTI_KV_INS_MODE;
     260                 :            :         }
     261                 :    4280906 :         item->txn = txn;
     262         [ +  + ]:    4280906 :         if (txn == &file->global_txn) {
     263                 :    4163295 :             file->wal->num_flushable++;
     264                 :            :         }
     265                 :    4280906 :         item->header = header;
     266                 :            : 
     267                 :    4280906 :         item->seqnum = doc->seqnum;
     268         [ +  + ]:    4280906 :         item->action = doc->deleted ? WAL_ACT_LOGICAL_REMOVE : WAL_ACT_INSERT;
     269                 :    4280906 :         item->offset = offset;
     270                 :    4280906 :         item->doc_size = doc->size_ondisk;
     271                 :    4280906 :         file->wal->datasize += doc->size_ondisk;
     272                 :    4280906 :         hash_insert(&file->wal->hash_byseq, &item->he_seq);
     273                 :            :         // insert into header's list
     274                 :            :         // (pushing front is ok for compactor because no other item already exists)
     275                 :    4280906 :         list_push_front(&header->items, &item->list_elem);
     276         [ +  + ]:    4280907 :         if (!is_compactor) {
     277                 :            :             // also insert into transaction's list
     278                 :    2440137 :             list_push_back(txn->items, &item->list_elem_txn);
     279                 :            :         } else {
     280                 :            :             // increase num_docs
     281                 :    1840770 :             _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, 1);
     282                 :            :         }
     283                 :            : 
     284                 :            :         // insert header into wal global list
     285                 :    4280907 :         list_push_back(&file->wal->list, &header->list_elem);
     286                 :    4280906 :         ++file->wal->size;
     287                 :            :     }
     288                 :            : 
     289                 :    4332510 :     spin_unlock(&file->wal->lock);
     290                 :            : 
     291                 :    4332510 :     return FDB_RESULT_SUCCESS;
     292                 :            : }
     293                 :            : 
     294                 :    2452159 : fdb_status wal_insert(fdb_txn *txn, struct filemgr *file, fdb_doc *doc, uint64_t offset)
     295                 :            : {
     296                 :    2452159 :     return _wal_insert(txn, file, doc, offset, 0);
     297                 :            : }
     298                 :            : 
     299                 :    1880340 : fdb_status wal_insert_by_compactor(fdb_txn *txn,
     300                 :            :                                    struct filemgr *file,
     301                 :            :                                    fdb_doc *doc,
     302                 :            :                                    uint64_t offset)
     303                 :            : {
     304                 :    1880340 :     return _wal_insert(txn, file, doc, offset, 1);
     305                 :            : }
     306                 :            : 
     307                 :    1060133 : static fdb_status _wal_find(fdb_txn *txn,
     308                 :            :                             struct filemgr *file,
     309                 :            :                             fdb_kvs_id_t kv_id,
     310                 :            :                             fdb_doc *doc,
     311                 :            :                             uint64_t *offset)
     312                 :            : {
     313                 :    1060133 :     struct wal_item item_query, *item = NULL;
     314                 :    1060133 :     struct wal_item_header query, *header = NULL;
     315                 :    1060133 :     struct list_elem *le = NULL;
     316                 :    1060133 :     struct hash_elem *he = NULL;
     317                 :    1060133 :     void *key = doc->key;
     318                 :    1060133 :     size_t keylen = doc->keylen;
     319                 :            : 
     320                 :    1060133 :     spin_lock(&file->wal->lock);
     321                 :            : 
     322 [ +  - ][ +  + ]:    1060606 :     if (doc->seqnum == SEQNUM_NOT_USED || (key && keylen>0)) {
                 [ +  + ]
     323                 :            :         // search by key
     324                 :    1058883 :         query.key = key;
     325                 :    1058883 :         query.keylen = keylen;
     326                 :    1058883 :         he = hash_find(&file->wal->hash_bykey, &query.he_key);
     327         [ +  + ]:    1059501 :         if (he) {
     328                 :            :             // retrieve header
     329                 :     247493 :             header = _get_entry(he, struct wal_item_header, he_key);
     330                 :     247493 :             le = list_begin(&header->items);
     331         [ +  + ]:     248191 :             while(le) {
     332                 :     247573 :                 item = _get_entry(le, struct wal_item, list_elem);
     333                 :            :                 // only committed items can be seen by the other handles, OR
     334                 :            :                 // items belonging to the same txn can be found, OR
     335                 :            :                 // a transaction's isolation level is read uncommitted.
     336 [ +  + ][ +  + ]:     247573 :                 if ((item->flag & WAL_ITEM_COMMITTED) ||
                 [ +  + ]
     337                 :            :                     (item->txn == txn) ||
     338                 :            :                     (txn->isolation == FDB_ISOLATION_READ_UNCOMMITTED)) {
     339                 :     246875 :                     *offset = item->offset;
     340         [ +  + ]:     246875 :                     if (item->action == WAL_ACT_INSERT) {
     341                 :     246871 :                         doc->deleted = false;
     342                 :            :                     } else {
     343                 :          4 :                         doc->deleted = true;
     344                 :            :                     }
     345                 :     246875 :                     spin_unlock(&file->wal->lock);
     346                 :     246875 :                     return FDB_RESULT_SUCCESS;
     347                 :            :                 }
     348                 :        698 :                 le = list_next(le);
     349                 :            :             }
     350                 :            :         }
     351                 :            :     } else {
     352                 :            :         // search by seqnum
     353                 :            :         fdb_kvs_id_t _kv_id;
     354                 :            :         struct wal_item_header temp_header;
     355                 :            : 
     356         [ +  - ]:       1723 :         if (file->kv_header) { // multi KV instance mode
     357                 :       1723 :             _kv_id = _endian_encode(kv_id);
     358                 :       1723 :             temp_header.key = (void *)&_kv_id;
     359                 :       1723 :             item_query.header = &temp_header;
     360                 :            :         }
     361                 :       1723 :         item_query.seqnum = doc->seqnum;
     362                 :       1723 :         he = hash_find(&file->wal->hash_byseq, &item_query.he_seq);
     363         [ +  + ]:       1723 :         if (he) {
     364                 :        502 :             item = _get_entry(he, struct wal_item, he_seq);
     365 [ +  + ][ -  + ]:        502 :             if ((item->flag & WAL_ITEM_COMMITTED) ||
                 [ #  # ]
     366                 :            :                 (item->txn == txn) ||
     367                 :            :                 (txn->isolation == FDB_ISOLATION_READ_UNCOMMITTED)) {
     368                 :        502 :                 *offset = item->offset;
     369         [ +  - ]:        502 :                 if (item->action == WAL_ACT_INSERT) {
     370                 :        502 :                     doc->deleted = false;
     371                 :            :                 } else {
     372                 :          0 :                     doc->deleted = true;
     373                 :            :                 }
     374                 :        502 :                 spin_unlock(&file->wal->lock);
     375                 :        502 :                 return FDB_RESULT_SUCCESS;
     376                 :            :             }
     377                 :            :         }
     378                 :            :     }
     379                 :            : 
     380                 :     813229 :     spin_unlock(&file->wal->lock);
     381                 :    1060115 :     return FDB_RESULT_KEY_NOT_FOUND;
     382                 :            : }
     383                 :            : 
     384                 :    1056588 : fdb_status wal_find(fdb_txn *txn, struct filemgr *file, fdb_doc *doc, uint64_t *offset)
     385                 :            : {
     386                 :    1056588 :     return _wal_find(txn, file, 0, doc, offset);
     387                 :            : }
     388                 :            : 
     389                 :       1723 : fdb_status wal_find_kv_id(fdb_txn *txn,
     390                 :            :                           struct filemgr *file,
     391                 :            :                           fdb_kvs_id_t kv_id,
     392                 :            :                           fdb_doc *doc,
     393                 :            :                           uint64_t *offset)
     394                 :            : {
     395                 :       1723 :     return _wal_find(txn, file, kv_id, doc, offset);
     396                 :            : }
     397                 :            : 
     398                 :            : // move all uncommitted items into 'new_file'
     399                 :         53 : fdb_status wal_txn_migration(void *dbhandle,
     400                 :            :                              void *new_dhandle,
     401                 :            :                              struct filemgr *old_file,
     402                 :            :                              struct filemgr *new_file,
     403                 :            :                              wal_doc_move_func *move_doc)
     404                 :            : {
     405                 :            :     uint64_t offset;
     406                 :            :     fdb_doc doc;
     407                 :            :     fdb_txn *txn;
     408                 :            :     struct wal_txn_wrapper *txn_wrapper;
     409                 :            :     struct wal_item_header *header;
     410                 :            :     struct wal_item *item;
     411                 :            :     struct list_elem *e1, *e2;
     412                 :            : 
     413                 :         53 :     spin_lock(&old_file->wal->lock);
     414                 :            : 
     415                 :         53 :     e1 = list_begin(&old_file->wal->list);
     416         [ +  + ]:      25469 :     while(e1) {
     417                 :      25416 :         header = _get_entry(e1, struct wal_item_header, list_elem);
     418                 :            : 
     419                 :      25416 :         e2 = list_end(&header->items);
     420         [ +  + ]:      50833 :         while(e2) {
     421                 :      25417 :             item = _get_entry(e2, struct wal_item, list_elem);
     422         [ +  + ]:      25417 :             if (!(item->flag & WAL_ITEM_COMMITTED)) {
     423                 :            :                 // not committed yet
     424                 :            :                 // move doc
     425                 :         12 :                 offset = move_doc(dbhandle, new_dhandle, item, &doc);
     426                 :            :                 // insert into new_file's WAL
     427                 :         12 :                 _wal_insert(item->txn, new_file, &doc, offset, 0);
     428                 :            :                 // remove from seq hash table
     429                 :         12 :                 hash_remove(&old_file->wal->hash_byseq, &item->he_seq);
     430                 :            :                 // remove from header's list
     431                 :         12 :                 e2 = list_remove_reverse(&header->items, e2);
     432                 :            :                 // remove from transaction's list
     433                 :         12 :                 list_remove(item->txn->items, &item->list_elem_txn);
     434                 :            :                 // decrease num_flushable of old_file if non-transactional update
     435         [ -  + ]:         12 :                 if (item->txn == &old_file->global_txn) {
     436                 :          0 :                     old_file->wal->num_flushable--;
     437                 :            :                 }
     438         [ +  - ]:         12 :                 if (item->action != WAL_ACT_REMOVE) {
     439                 :         12 :                     old_file->wal->datasize -= item->doc_size;
     440                 :            :                 }
     441                 :            :                 // free item
     442                 :         12 :                 free(item);
     443                 :            :                 // free doc
     444                 :         12 :                 free(doc.key);
     445                 :         12 :                 free(doc.meta);
     446                 :         12 :                 free(doc.body);
     447                 :         12 :                 old_file->wal->size--;
     448                 :            :             } else {
     449                 :      25405 :                 e2= list_prev(e2);
     450                 :            :             }
     451                 :            :         }
     452                 :            : 
     453         [ +  + ]:      25416 :         if (list_begin(&header->items) == NULL) {
     454                 :            :             // header's list becomes empty
     455                 :            :             // remove from key hash table
     456                 :         11 :             hash_remove(&old_file->wal->hash_bykey, &header->he_key);
     457                 :            :             // remove from wal list
     458                 :         11 :             e1 = list_remove(&old_file->wal->list, &header->list_elem);
     459                 :            :             // free key & header
     460                 :         11 :             free(header->key);
     461                 :         11 :             free(header);
     462                 :            :         } else {
     463                 :      25405 :             e1 = list_next(e1);
     464                 :            :         }
     465                 :            :     }
     466                 :            : 
     467                 :            :     // migrate all entries in txn list
     468                 :         53 :     e1 = list_begin(&old_file->wal->txn_list);
     469         [ +  + ]:        112 :     while(e1) {
     470                 :         59 :         txn_wrapper = _get_entry(e1, struct wal_txn_wrapper, le);
     471                 :         59 :         txn = txn_wrapper->txn;
     472                 :            :         // except for global_txn
     473         [ +  + ]:         59 :         if (txn != &old_file->global_txn) {
     474                 :          6 :             e1 = list_remove(&old_file->wal->txn_list, &txn_wrapper->le);
     475                 :          6 :             list_push_front(&new_file->wal->txn_list, &txn_wrapper->le);
     476                 :            :             // remove previous header info
     477                 :          6 :             txn->prev_hdr_bid = BLK_NOT_FOUND;
     478                 :            :         } else {
     479                 :         53 :             e1 = list_next(e1);
     480                 :            :         }
     481                 :            :     }
     482                 :            : 
     483                 :         53 :     spin_unlock(&old_file->wal->lock);
     484                 :            : 
     485                 :         53 :     return FDB_RESULT_SUCCESS;
     486                 :            : }
     487                 :            : 
     488                 :      18006 : fdb_status wal_commit(fdb_txn *txn, struct filemgr *file,
     489                 :            :                       wal_commit_mark_func *func)
     490                 :            : {
     491                 :            :     int prev_commit;
     492                 :            :     wal_item_action prev_action;
     493                 :            :     struct wal_item *item;
     494                 :            :     struct wal_item *_item;
     495                 :            :     struct list_elem *e1, *e2;
     496                 :            :     fdb_kvs_id_t kv_id, *_kv_id;
     497                 :            :     fdb_status status;
     498                 :            : 
     499                 :      18006 :     spin_lock(&file->wal->lock);
     500                 :            : 
     501                 :      18006 :     e1 = list_begin(txn->items);
     502         [ +  + ]:    2469302 :     while(e1) {
     503                 :    2451296 :         item = _get_entry(e1, struct wal_item, list_elem_txn);
     504         [ -  + ]:    2451296 :         assert(item->txn == txn);
     505                 :            : 
     506         [ +  - ]:    2451296 :         if (!(item->flag & WAL_ITEM_COMMITTED)) {
     507                 :            :             // get KVS ID
     508         [ +  + ]:    2451296 :             if (item->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
     509                 :    2450270 :                 _kv_id = (fdb_kvs_id_t*)item->header->key;
     510                 :    2450270 :                 kv_id = _endian_decode(*_kv_id);
     511                 :            :             } else {
     512                 :       1026 :                 kv_id = 0;
     513                 :            :             }
     514                 :            : 
     515                 :    2451296 :             item->flag |= WAL_ITEM_COMMITTED;
     516                 :            :             // append commit mark if necessary
     517         [ +  + ]:    2451296 :             if (func) {
     518                 :     120630 :                 status = func(txn->handle, item->offset);
     519         [ -  + ]:     120630 :                 if (status != FDB_RESULT_SUCCESS) {
     520                 :          0 :                     spin_unlock(&file->wal->lock);
     521                 :          0 :                     return status;
     522                 :            :                 }
     523                 :            :             }
     524                 :            :             // remove previously committed item
     525                 :    2451296 :             prev_commit = 0;
     526                 :            :             // next item on the wal_item_header's items
     527                 :    2451296 :             e2 = list_next(&item->list_elem);
     528         [ +  + ]:    2462518 :             while(e2) {
     529                 :      11222 :                 _item = _get_entry(e2, struct wal_item, list_elem);
     530                 :      11222 :                 e2 = list_next(e2);
     531                 :            :                 // committed but not flush-ready
     532                 :            :                 // (flush-readied item will be removed by flushing)
     533 [ +  + ][ +  + ]:      11222 :                 if ((_item->flag & WAL_ITEM_COMMITTED) &&
     534                 :      11208 :                     !(_item->flag & WAL_ITEM_FLUSH_READY)) {
     535                 :       8860 :                     list_remove(&item->header->items, &_item->list_elem);
     536                 :       8860 :                     hash_remove(&file->wal->hash_byseq, &_item->he_seq);
     537                 :       8860 :                     prev_action = _item->action;
     538                 :       8860 :                     prev_commit = 1;
     539                 :       8860 :                     file->wal->size--;
     540                 :       8860 :                     file->wal->num_flushable--;
     541         [ +  - ]:       8860 :                     if (item->action != WAL_ACT_REMOVE) {
     542                 :       8860 :                         file->wal->datasize -= _item->doc_size;
     543                 :            :                     }
     544                 :       8860 :                     free(_item);
     545                 :            :                 }
     546                 :            :             }
     547         [ +  + ]:    2451296 :             if (!prev_commit) {
     548                 :            :                 // there was no previous commit .. increase num_docs
     549                 :    2442436 :                 _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, 1);
     550         [ +  + ]:    2442436 :                 if (item->action == WAL_ACT_LOGICAL_REMOVE) {
     551                 :        526 :                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, 1);
     552                 :            :                 }
     553                 :            :             } else {
     554 [ +  + ][ +  + ]:       8860 :                 if (prev_action == WAL_ACT_INSERT &&
     555                 :            :                     item->action == WAL_ACT_LOGICAL_REMOVE) {
     556                 :        204 :                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, 1);
     557 [ +  + ][ +  - ]:       8656 :                 } else if (prev_action == WAL_ACT_LOGICAL_REMOVE &&
     558                 :            :                            item->action == WAL_ACT_INSERT) {
     559                 :        100 :                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
     560                 :            :                 }
     561                 :            :             }
     562                 :            :             // increase num_flushable if it is transactional update
     563         [ +  + ]:    2451296 :             if (item->txn != &file->global_txn) {
     564                 :     120630 :                 file->wal->num_flushable++;
     565                 :            :             }
     566                 :            :             // move the committed item to the end of the wal_item_header's list
     567                 :    2451296 :             list_remove(&item->header->items, &item->list_elem);
     568                 :    2451296 :             list_push_back(&item->header->items, &item->list_elem);
     569                 :            :         }
     570                 :            : 
     571                 :            :         // remove from transaction's list
     572                 :    2451296 :         e1 = list_remove(txn->items, e1);
     573                 :            :     }
     574                 :            : 
     575                 :      18006 :     spin_unlock(&file->wal->lock);
     576                 :      18006 :     return FDB_RESULT_SUCCESS;
     577                 :            : }
     578                 :            : 
     579                 :   59094065 : int _wal_flush_cmp(struct avl_node *a, struct avl_node *b, void *aux)
     580                 :            : {
     581                 :            :     struct wal_item *aa, *bb;
     582                 :   59094065 :     aa = _get_entry(a, struct wal_item, avl);
     583                 :   59094065 :     bb = _get_entry(b, struct wal_item, avl);
     584                 :            : 
     585         [ +  + ]:   59094065 :     if (aa->old_offset < bb->old_offset) {
     586                 :    4780111 :         return -1;
     587         [ +  + ]:   54313954 :     } else if (aa->old_offset > bb->old_offset) {
     588                 :    1115446 :         return 1;
     589                 :            :     } else {
     590                 :            :         // old_offset can be 0 if the document was newly inserted
     591         [ +  + ]:   53198508 :         if (aa->offset < bb->offset) {
     592                 :   53061058 :             return -1;
     593         [ +  - ]:     137450 :         } else if (aa->offset > bb->offset) {
     594                 :     137450 :             return 1;
     595                 :            :         } else {
     596                 :   59094065 :             return 0;
     597                 :            :         }
     598                 :            :     }
     599                 :            : }
     600                 :            : 
     601                 :        967 : fdb_status wal_release_flushed_items(struct filemgr *file,
     602                 :            :                                      struct avl_tree *flush_items)
     603                 :            : {
     604                 :        967 :     struct avl_tree *tree = flush_items;
     605                 :            :     struct avl_node *a;
     606                 :            :     struct wal_item *item;
     607                 :            :     fdb_kvs_id_t kv_id, *_kv_id;
     608                 :            : 
     609                 :            :     // scan and remove entries in the avl-tree
     610                 :        967 :     spin_lock(&file->wal->lock);
     611                 :    4269025 :     while (1) {
     612         [ +  + ]:    4269992 :         if ((a = avl_first(tree)) == NULL) {
     613                 :        967 :             break;
     614                 :            :         }
     615                 :    4269025 :         item = _get_entry(a, struct wal_item, avl);
     616                 :    4269025 :         avl_remove(tree, &item->avl);
     617                 :            : 
     618                 :            :         // get KVS ID
     619         [ +  + ]:    4268553 :         if (item->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
     620                 :    4267538 :             _kv_id = (fdb_kvs_id_t*)item->header->key;
     621                 :    4267538 :             kv_id = _endian_decode(*_kv_id);
     622                 :            :         } else {
     623                 :       1015 :             kv_id = 0;
     624                 :            :         }
     625                 :            : 
     626                 :    4268553 :         list_remove(&item->header->items, &item->list_elem);
     627                 :    4268559 :         hash_remove(&file->wal->hash_byseq, &item->he_seq);
     628         [ +  + ]:    4268627 :         if (list_begin(&item->header->items) == NULL) {
     629                 :            :             // wal_item_header becomes empty
     630                 :            :             // free header and remove from hash table & wal list
     631                 :    4266277 :             list_remove(&file->wal->list, &item->header->list_elem);
     632                 :    4266210 :             hash_remove(&file->wal->hash_bykey, &item->header->he_key);
     633                 :    4266658 :             free(item->header->key);
     634                 :    4266658 :             free(item->header);
     635                 :            :         }
     636                 :            : 
     637 [ +  + ][ -  + ]:    4269010 :         if (item->action == WAL_ACT_LOGICAL_REMOVE ||
     638                 :            :             item->action == WAL_ACT_REMOVE) {
     639                 :        595 :             _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
     640                 :            :         }
     641                 :    4269027 :         _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, -1);
     642                 :    4269025 :         file->wal->size--;
     643                 :    4269025 :         file->wal->num_flushable--;
     644         [ +  # ]:    4269025 :         if (item->action != WAL_ACT_REMOVE) {
     645                 :    4269028 :             file->wal->datasize -= item->doc_size;
     646                 :            :         }
     647                 :    4269025 :         free(item);
     648                 :            :     }
     649                 :        967 :     spin_unlock(&file->wal->lock);
     650                 :            : 
     651                 :        967 :     return FDB_RESULT_SUCCESS;
     652                 :            : }
     653                 :            : 
     654                 :        968 : fdb_status _wal_flush(struct filemgr *file,
     655                 :            :                      void *dbhandle,
     656                 :            :                      wal_flush_func *flush_func,
     657                 :            :                      wal_get_old_offset_func *get_old_offset,
     658                 :            :                      struct avl_tree *flush_items,
     659                 :            :                      bool by_compactor)
     660                 :            : {
     661                 :        968 :     struct avl_tree *tree = flush_items;
     662                 :            :     struct avl_node *a;
     663                 :            :     struct list_elem *e, *ee;
     664                 :            :     struct wal_item *item;
     665                 :            :     struct wal_item_header *header;
     666                 :            : 
     667                 :            :     // sort by old byte-offset of the document (for sequential access)
     668                 :        968 :     spin_lock(&file->wal->lock);
     669                 :        968 :     avl_init(tree, NULL);
     670                 :        968 :     e = list_begin(&file->wal->list);
     671         [ +  + ]:    4313322 :     while(e){
     672                 :    4312354 :         header = _get_entry(e, struct wal_item_header, list_elem);
     673                 :    4312354 :         ee = list_end(&header->items);
     674         [ +  + ]:    8583468 :         while(ee) {
     675                 :    4312358 :             item = _get_entry(ee, struct wal_item, list_elem);
     676                 :            :             // committed but not flushed items
     677         [ +  + ]:    4312358 :             if (!(item->flag & WAL_ITEM_COMMITTED)) {
     678                 :         45 :                 break;
     679                 :            :             }
     680 [ +  + ][ +  + ]:    4312313 :             if (by_compactor &&
     681                 :    1880340 :                 !(item->flag & WAL_ITEM_BY_COMPACTOR)) {
     682                 :            :                 // during compaction, do not flush normally committed item
     683                 :      41199 :                 break;
     684                 :            :             }
     685         [ +  - ]:    4271114 :             if (!(item->flag & WAL_ITEM_FLUSH_READY)) {
     686                 :    4271114 :                 item->flag |= WAL_ITEM_FLUSH_READY;
     687                 :            :                 // if WAL_ITEM_FLUSH_READY flag is set,
     688                 :            :                 // this item becomes immutable, so that
     689                 :            :                 // no other concurrent thread modifies it.
     690                 :    4271114 :                 spin_unlock(&file->wal->lock);
     691                 :    4271114 :                 item->old_offset = get_old_offset(dbhandle, item);
     692                 :    4271114 :                 avl_insert(tree, &item->avl, _wal_flush_cmp);
     693                 :    4271114 :                 spin_lock(&file->wal->lock);
     694                 :            :             }
     695                 :    4271114 :             ee = list_prev(ee);
     696                 :            :         }
     697                 :    4312354 :         e = list_next(e);
     698                 :            :     }
     699                 :        968 :     spin_unlock(&file->wal->lock);
     700                 :            : 
     701                 :            :     // scan and flush entries in the avl-tree
     702                 :        968 :     a = avl_first(tree);
     703         [ +  + ]:    4269998 :     while (a) {
     704                 :    4269031 :         item = _get_entry(a, struct wal_item, avl);
     705                 :    4269031 :         a = avl_next(a);
     706                 :            : 
     707                 :            :         // check weather this item is updated after insertion into tree
     708         [ +  - ]:    4269029 :         if (item->flag & WAL_ITEM_FLUSH_READY) {
     709                 :    4269030 :             fdb_status fs = flush_func(dbhandle, item);
     710         [ +  + ]:    4269030 :             if (fs != FDB_RESULT_SUCCESS) {
     711                 :          1 :                 return fs;
     712                 :            :             }
     713                 :            :         }
     714                 :            :     }
     715                 :            : 
     716                 :        968 :     return FDB_RESULT_SUCCESS;
     717                 :            : }
     718                 :            : 
     719                 :        916 : fdb_status wal_flush(struct filemgr *file,
     720                 :            :                      void *dbhandle,
     721                 :            :                      wal_flush_func *flush_func,
     722                 :            :                      wal_get_old_offset_func *get_old_offset,
     723                 :            :                      struct avl_tree *flush_items)
     724                 :            : {
     725                 :            :     return _wal_flush(file, dbhandle, flush_func, get_old_offset,
     726                 :        916 :                       flush_items, false);
     727                 :            : }
     728                 :            : 
     729                 :         52 : fdb_status wal_flush_by_compactor(struct filemgr *file,
     730                 :            :                                   void *dbhandle,
     731                 :            :                                   wal_flush_func *flush_func,
     732                 :            :                                   wal_get_old_offset_func *get_old_offset,
     733                 :            :                                   struct avl_tree *flush_items)
     734                 :            : {
     735                 :            :     return _wal_flush(file, dbhandle, flush_func, get_old_offset,
     736                 :         52 :                       flush_items, true);
     737                 :            : }
     738                 :            : 
     739                 :            : // Used to copy all the WAL items for non-durable snapshots
     740                 :          2 : fdb_status wal_snapshot(struct filemgr *file,
     741                 :            :                         void *dbhandle, fdb_txn *txn,
     742                 :            :                         wal_snapshot_func *snapshot_func)
     743                 :            : {
     744                 :            :     struct list_elem *e, *ee;
     745                 :            :     struct wal_item *item;
     746                 :            :     struct wal_item_header *header;
     747                 :            : 
     748                 :          2 :     spin_lock(&file->wal->lock);
     749                 :          2 :     e = list_begin(&file->wal->list);
     750         [ +  + ]:         12 :     while(e){
     751                 :         10 :         header = _get_entry(e, struct wal_item_header, list_elem);
     752                 :         10 :         ee = list_begin(&header->items);
     753         [ +  - ]:         10 :         while(ee) {
     754                 :         10 :             item = _get_entry(ee, struct wal_item, list_elem);
     755 [ +  + ][ -  + ]:         10 :             if (!(item->flag & WAL_ITEM_COMMITTED) && // Skip uncommitted items
                 [ #  # ]
     756                 :            :                 item->txn != &file->global_txn && // that aren't part of global
     757                 :            :                 item->txn != txn) { // nor current transaction
     758                 :          0 :                 ee = list_next(ee);
     759                 :          0 :                 continue;
     760                 :            :             }
     761                 :            :             fdb_doc doc;
     762                 :         10 :             doc.keylen = item->header->keylen;
     763                 :         10 :             doc.key = malloc(doc.keylen); // (freed in fdb_snapshot_close)
     764                 :         10 :             memcpy(doc.key, item->header->key, doc.keylen);
     765                 :         10 :             doc.seqnum = item->seqnum;
     766                 :            :             doc.deleted = (item->action == WAL_ACT_LOGICAL_REMOVE ||
     767 [ +  - ][ -  + ]:         10 :                     item->action == WAL_ACT_REMOVE);
     768                 :         10 :             snapshot_func(dbhandle, &doc, item->offset);
     769                 :         10 :             break; // We just require a single latest copy in the snapshot
     770                 :            :         }
     771                 :         10 :         e = list_next(e);
     772                 :            :     }
     773                 :          2 :     spin_unlock(&file->wal->lock);
     774                 :            : 
     775                 :          2 :     return FDB_RESULT_SUCCESS;
     776                 :            : }
     777                 :            : 
     778                 :            : // discard entries in txn
     779                 :          7 : fdb_status wal_discard(struct filemgr *file, fdb_txn *txn)
     780                 :            : {
     781                 :            :     struct wal_item *item;
     782                 :            :     struct list_elem *e;
     783                 :            : 
     784                 :          7 :     spin_lock(&file->wal->lock);
     785                 :            : 
     786                 :          7 :     e = list_begin(txn->items);
     787         [ +  + ]:        616 :     while(e) {
     788                 :        609 :         item = _get_entry(e, struct wal_item, list_elem_txn);
     789                 :            : 
     790                 :            :         // remove from seq hash table
     791                 :        609 :         hash_remove(&file->wal->hash_byseq, &item->he_seq);
     792                 :            :         // remove from header's list
     793                 :        609 :         list_remove(&item->header->items, &item->list_elem);
     794                 :            :         // remove header if empty
     795         [ +  + ]:        609 :         if (list_begin(&item->header->items) == NULL) {
     796                 :            :             //remove from key hash table
     797                 :          9 :             hash_remove(&file->wal->hash_bykey, &item->header->he_key);
     798                 :            :             // remove from wal list
     799                 :          9 :             list_remove(&file->wal->list, &item->header->list_elem);
     800                 :            :             // free key and header
     801                 :          9 :             free(item->header->key);
     802                 :          9 :             free(item->header);
     803                 :            :         }
     804                 :            :         // remove from txn's list
     805                 :        609 :         e = list_remove(txn->items, e);
     806 [ +  - ][ -  + ]:        609 :         if (item->txn == &file->global_txn ||
     807                 :            :             item->flag & WAL_ITEM_COMMITTED) {
     808                 :          0 :             file->wal->num_flushable--;
     809                 :            :         }
     810         [ +  - ]:        609 :         if (item->action != WAL_ACT_REMOVE) {
     811                 :        609 :             file->wal->datasize -= item->doc_size;
     812                 :            :         }
     813                 :            :         // free
     814                 :        609 :         free(item);
     815                 :        609 :         file->wal->size--;
     816                 :            :     }
     817                 :            : 
     818                 :          7 :     spin_unlock(&file->wal->lock);
     819                 :          7 :     return FDB_RESULT_SUCCESS;
     820                 :            : }
     821                 :            : 
     822                 :            : typedef enum wal_discard_type {
     823                 :            :     WAL_DISCARD_UNCOMMITTED_ONLY,
     824                 :            :     WAL_DISCARD_ALL,
     825                 :            :     WAL_DISCARD_KV_INS,
     826                 :            : } wal_discard_t;
     827                 :            : 
     828                 :            : // discard all entries
     829                 :        521 : fdb_status _wal_close(struct filemgr *file,
     830                 :            :                       wal_discard_t type, void *aux)
     831                 :            : {
     832                 :            :     struct wal_item *item;
     833                 :            :     struct wal_item_header *header;
     834                 :            :     struct list_elem *e1, *e2;
     835                 :            :     fdb_kvs_id_t *_kv_id, kv_id, kv_id_req;
     836                 :            :     bool committed;
     837                 :            :     wal_item_action committed_item_action;
     838                 :            : 
     839         [ +  + ]:        521 :     if (type == WAL_DISCARD_KV_INS) { // multi KV ins mode
     840         [ -  + ]:         22 :         if (aux == NULL) { // aux must contain pointer to KV ID
     841                 :          0 :             return FDB_RESULT_INVALID_ARGS;
     842                 :            :         }
     843                 :         22 :         kv_id_req = *(fdb_kvs_id_t*)aux;
     844                 :            :     }
     845                 :            : 
     846                 :        521 :     spin_lock(&file->wal->lock);
     847                 :            : 
     848                 :        521 :     e1 = list_begin(&file->wal->list);
     849         [ +  + ]:      28496 :     while(e1){
     850                 :      27975 :         header = _get_entry(e1, struct wal_item_header, list_elem);
     851                 :            : 
     852         [ +  + ]:      27975 :         if (type == WAL_DISCARD_KV_INS) { // multi KV ins mode
     853                 :       1808 :             _kv_id = (fdb_kvs_id_t *)header->key;
     854                 :       1808 :             kv_id = _endian_decode(*_kv_id);
     855                 :            :             // begin while loop only on matching KV ID
     856         [ +  + ]:       1808 :             e2 = (kv_id == kv_id_req)?(list_begin(&header->items)):(NULL);
     857                 :            :         } else {
     858                 :      26167 :             kv_id = 0;
     859                 :      26167 :             e2 = list_begin(&header->items);
     860                 :            :         }
     861                 :            : 
     862                 :      27975 :         committed = false;
     863         [ +  + ]:      55251 :         while(e2) {
     864                 :      27276 :             item = _get_entry(e2, struct wal_item, list_elem);
     865                 :            : 
     866 [ +  + ][ +  + ]:      27276 :             if ( type == WAL_DISCARD_ALL ||
         [ +  + ][ +  + ]
     867                 :            :                 (type == WAL_DISCARD_UNCOMMITTED_ONLY &&
     868                 :      13094 :                     !(item->flag & WAL_ITEM_COMMITTED)) ||
     869                 :            :                  type == WAL_DISCARD_KV_INS) {
     870                 :            :                 // remove from header's list
     871                 :      14207 :                 e2 = list_remove(&header->items, e2);
     872         [ +  + ]:      14207 :                 if (!(item->flag & WAL_ITEM_COMMITTED)) {
     873                 :            :                     // and also remove from transaction's list
     874                 :         25 :                     list_remove(item->txn->items, &item->list_elem_txn);
     875                 :            :                 } else {
     876                 :            :                     // committed item exists and will be removed
     877                 :      14182 :                     committed = true;
     878                 :      14182 :                     committed_item_action = item->action;
     879                 :            :                 }
     880                 :            :                 // remove from seq hash table
     881                 :      14207 :                 hash_remove(&file->wal->hash_byseq, &item->he_seq);
     882                 :            : 
     883         [ +  - ]:      14207 :                 if (item->action != WAL_ACT_REMOVE) {
     884                 :      14207 :                     file->wal->datasize -= item->doc_size;
     885                 :            :                 }
     886         [ +  + ]:      14207 :                 if (item->txn == &file->global_txn) {
     887                 :      13597 :                     file->wal->num_flushable--;
     888                 :            :                 }
     889                 :            : 
     890                 :      14207 :                 free(item);
     891                 :      14207 :                 file->wal->size--;
     892                 :            :             } else {
     893                 :      13069 :                 e2 = list_next(e2);
     894                 :            :             }
     895                 :            :         }
     896                 :      27975 :         e1 = list_next(e1);
     897                 :            : 
     898         [ +  + ]:      27975 :         if (list_begin(&header->items) == NULL) {
     899                 :            :             // wal_item_header becomes empty
     900                 :            :             // free header and remove from hash table & wal list
     901                 :      14206 :             list_remove(&file->wal->list, &header->list_elem);
     902                 :      14206 :             hash_remove(&file->wal->hash_bykey, &header->he_key);
     903                 :      14206 :             free(header->key);
     904                 :      14206 :             free(header);
     905                 :            : 
     906         [ +  + ]:      14206 :             if (committed) {
     907                 :            :                 // this document was committed
     908                 :            :                 // num_docs and num_deletes should be updated
     909 [ +  + ][ -  + ]:      14182 :                 if (committed_item_action == WAL_ACT_LOGICAL_REMOVE ||
     910                 :            :                     committed_item_action == WAL_ACT_REMOVE) {
     911                 :         20 :                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
     912                 :            :                 }
     913                 :      14182 :                 _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, -1);
     914                 :            :             }
     915                 :            :         }
     916                 :            :     }
     917                 :            : 
     918                 :        521 :     spin_unlock(&file->wal->lock);
     919                 :        521 :     return FDB_RESULT_SUCCESS;
     920                 :            : }
     921                 :            : 
     922                 :        250 : fdb_status wal_close(struct filemgr *file)
     923                 :            : {
     924                 :        250 :     return _wal_close(file, WAL_DISCARD_UNCOMMITTED_ONLY, NULL);
     925                 :            : }
     926                 :            : 
     927                 :            : // discard all WAL entries
     928                 :        249 : fdb_status wal_shutdown(struct filemgr *file)
     929                 :            : {
     930                 :        249 :     fdb_status wr = _wal_close(file, WAL_DISCARD_ALL, NULL);
     931                 :        249 :     file->wal->size = 0;
     932                 :        249 :     file->wal->num_flushable = 0;
     933                 :        249 :     return wr;
     934                 :            : }
     935                 :            : 
     936                 :            : // discard all WAL entries belonging to KV_ID
     937                 :         22 : fdb_status wal_close_kv_ins(struct filemgr *file,
     938                 :            :                             fdb_kvs_id_t kv_id)
     939                 :            : {
     940                 :         22 :     return _wal_close(file, WAL_DISCARD_KV_INS, &kv_id);
     941                 :            : }
     942                 :            : 
     943                 :        222 : size_t wal_get_size(struct filemgr *file)
     944                 :            : {
     945                 :        222 :     return file->wal->size;
     946                 :            : }
     947                 :            : 
     948                 :    2407614 : size_t wal_get_num_flushable(struct filemgr *file)
     949                 :            : {
     950                 :    2407614 :     return file->wal->num_flushable;
     951                 :            : }
     952                 :            : 
     953                 :       1091 : size_t wal_get_num_docs(struct filemgr *file) {
     954                 :       1091 :     return _kvs_stat_get_sum(file, KVS_STAT_WAL_NDOCS);
     955                 :            : }
     956                 :            : 
     957                 :       1091 : size_t wal_get_num_deletes(struct filemgr *file) {
     958                 :       1091 :     return _kvs_stat_get_sum(file, KVS_STAT_WAL_NDELETES);
     959                 :            : }
     960                 :            : 
     961                 :       1115 : size_t wal_get_datasize(struct filemgr *file)
     962                 :            : {
     963                 :       1115 :     size_t datasize = 0;
     964                 :       1115 :     spin_lock(&file->wal->lock);
     965                 :       1115 :     datasize = file->wal->datasize;
     966                 :       1115 :     spin_unlock(&file->wal->lock);
     967                 :            : 
     968                 :       1115 :     return datasize;
     969                 :            : }
     970                 :            : 
     971                 :       1357 : void wal_set_dirty_status(struct filemgr *file, wal_dirty_t status)
     972                 :            : {
     973                 :       1357 :     spin_lock(&file->wal->lock);
     974                 :       1357 :     file->wal->wal_dirty = status;
     975                 :       1357 :     spin_unlock(&file->wal->lock);
     976                 :       1357 : }
     977                 :            : 
     978                 :    2476642 : wal_dirty_t wal_get_dirty_status(struct filemgr *file)
     979                 :            : {
     980                 :            :     wal_dirty_t ret;
     981                 :    2476642 :     spin_lock(&file->wal->lock);
     982                 :    2476642 :     ret = file->wal->wal_dirty;
     983                 :    2476642 :     spin_unlock(&file->wal->lock);
     984                 :    2476642 :     return ret;
     985                 :            : }
     986                 :            : 
     987                 :       1474 : void wal_add_transaction(struct filemgr *file, fdb_txn *txn)
     988                 :            : {
     989                 :       1474 :     spin_lock(&file->wal->lock);
     990                 :       1474 :     list_push_front(&file->wal->txn_list, &txn->wrapper->le);
     991                 :       1474 :     spin_unlock(&file->wal->lock);
     992                 :       1474 : }
     993                 :            : 
     994                 :       1473 : void wal_remove_transaction(struct filemgr *file, fdb_txn *txn)
     995                 :            : {
     996                 :       1473 :     spin_lock(&file->wal->lock);
     997                 :       1473 :     list_remove(&file->wal->txn_list, &txn->wrapper->le);
     998                 :       1473 :     spin_unlock(&file->wal->lock);
     999                 :       1473 : }
    1000                 :            : 
    1001                 :        310 : fdb_txn * wal_earliest_txn(struct filemgr *file, fdb_txn *cur_txn)
    1002                 :            : {
    1003                 :            :     struct list_elem *le;
    1004                 :            :     struct wal_txn_wrapper *txn_wrapper;
    1005                 :            :     fdb_txn *txn;
    1006                 :        310 :     fdb_txn *ret = NULL;
    1007                 :        310 :     bid_t bid = BLK_NOT_FOUND;
    1008                 :            : 
    1009                 :        310 :     spin_lock(&file->wal->lock);
    1010                 :            : 
    1011                 :        310 :     le = list_begin(&file->wal->txn_list);
    1012         [ +  + ]:        654 :     while(le) {
    1013                 :        344 :         txn_wrapper = _get_entry(le, struct wal_txn_wrapper, le);
    1014                 :        344 :         txn = txn_wrapper->txn;
    1015 [ +  + ][ +  + ]:        344 :         if (txn != cur_txn && list_begin(txn->items)) {
                 [ +  + ]
    1016 [ -  + ][ #  # ]:          7 :             if (bid == BLK_NOT_FOUND || txn->prev_hdr_bid < bid) {
    1017                 :          7 :                 bid = txn->prev_hdr_bid;
    1018                 :          7 :                 ret = txn;
    1019                 :            :             }
    1020                 :            :         }
    1021                 :        344 :         le = list_next(le);
    1022                 :            :     }
    1023                 :        310 :     spin_unlock(&file->wal->lock);
    1024                 :            : 
    1025                 :        310 :     return ret;
    1026                 :            : }
    1027                 :            : 
    1028                 :         25 : bool wal_txn_exists(struct filemgr *file)
    1029                 :            : {
    1030                 :            :     struct list_elem *le;
    1031                 :            :     struct wal_txn_wrapper *txn_wrapper;
    1032                 :            :     fdb_txn *txn;
    1033                 :            : 
    1034                 :         25 :     spin_lock(&file->wal->lock);
    1035                 :            : 
    1036                 :         25 :     le = list_begin(&file->wal->txn_list);
    1037         [ +  + ]:         48 :     while(le) {
    1038                 :         25 :         txn_wrapper = _get_entry(le, struct wal_txn_wrapper, le);
    1039                 :         25 :         txn = txn_wrapper->txn;
    1040         [ +  + ]:         25 :         if (txn != &file->global_txn) {
    1041                 :          2 :             spin_unlock(&file->wal->lock);
    1042                 :          2 :             return true;
    1043                 :            :         }
    1044                 :         23 :         le = list_next(le);
    1045                 :            :     }
    1046                 :         23 :     spin_unlock(&file->wal->lock);
    1047                 :            : 
    1048                 :         25 :     return false;
    1049                 :            : }

Generated by: LCOV version 1.11