Branch data Line data Source code
1 : : /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 : : /*
3 : : * Copyright 2010 Couchbase, Inc
4 : : *
5 : : * Licensed under the Apache License, Version 2.0 (the "License");
6 : : * you may not use this file except in compliance with the License.
7 : : * You may obtain a copy of the License at
8 : : *
9 : : * http://www.apache.org/licenses/LICENSE-2.0
10 : : *
11 : : * Unless required by applicable law or agreed to in writing, software
12 : : * distributed under the License is distributed on an "AS IS" BASIS,
13 : : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : : * See the License for the specific language governing permissions and
15 : : * limitations under the License.
16 : : */
17 : :
18 : : #include <stdio.h>
19 : : #include <stdlib.h>
20 : : #include <string.h>
21 : : #include <fcntl.h>
22 : : #if !defined(WIN32) && !defined(_WIN32)
23 : : #include <sys/time.h>
24 : : #include <dirent.h>
25 : : #include <unistd.h>
26 : : #endif
27 : :
28 : : #include "libforestdb/forestdb.h"
29 : : #include "fdb_internal.h"
30 : : #include "filemgr.h"
31 : : #include "avltree.h"
32 : : #include "common.h"
33 : : #include "filemgr_ops.h"
34 : : #include "configuration.h"
35 : : #include "internal_types.h"
36 : : #include "compactor.h"
37 : : #include "wal.h"
38 : : #include "memleak.h"
39 : :
40 : : #ifdef __DEBUG
41 : : #ifndef __DEBUG_CPT
42 : : #undef DBG
43 : : #undef DBGCMD
44 : : #undef DBGSW
45 : : #define DBG(...)
46 : : #define DBGCMD(...)
47 : : #define DBGSW(n, ...)
48 : : #endif
49 : : #endif
50 : :
51 : : #define COMPACTOR_META_VERSION (1)
52 : : #define MAX_FNAMELEN (FDB_MAX_FILENAME_LEN)
53 : :
54 : : // variables for initialization
55 : : static volatile uint8_t compactor_initialized = 0;
56 : : #ifdef SPIN_INITIALIZER
57 : : static spin_t cpt_lock = SPIN_INITIALIZER;
58 : : #else
59 : : static volatile unsigned int init_lock_status = 0;
60 : : static spin_t cpt_lock;
61 : : #endif
62 : :
63 : : static thread_t compactor_tid;
64 : : static size_t sleep_duration = FDB_COMPACTOR_SLEEP_DURATION;
65 : :
66 : : static mutex_t sync_mutex;
67 : : static thread_cond_t sync_cond;
68 : :
69 : : typedef uint8_t compactor_status_t;
70 : : enum{
71 : : CPT_IDLE = 0,
72 : : CPT_WORKING = 1,
73 : : };
74 : : static compactor_status_t compactor_status;
75 : : static volatile uint8_t compactor_terminate_signal = 0;
76 : :
77 : : static struct avl_tree openfiles;
78 : :
79 : : // cursor of openfiles_elem that is currently being compacted.
80 : : // set to NULL if no file is being compacted.
81 : : static struct avl_node *target_cursor;
82 : :
83 : : struct openfiles_elem {
84 : : struct filemgr *file;
85 : : fdb_config config;
86 : : uint32_t register_count;
87 : : bool compaction_flag; // set when the file is being compacted
88 : : struct avl_node avl;
89 : : };
90 : :
91 : : struct compactor_args_t {
92 : : // void *aux; (reserved for future use)
93 : : size_t strcmp_len; // Used to search for prefix match
94 : : };
95 : : static struct compactor_args_t compactor_args;
96 : :
97 : : struct compactor_meta{
98 : : uint32_t version;
99 : : char filename[MAX_FNAMELEN];
100 : : uint32_t crc;
101 : : };
102 : :
103 : : #if !defined(WIN32) && !defined(_WIN32)
104 : 116 : struct timespec convert_reltime_to_abstime(unsigned int ms) {
105 : : struct timespec ts;
106 : : struct timeval tp;
107 : : uint64_t wakeup;
108 : :
109 : 116 : memset(&ts, 0, sizeof(ts));
110 : :
111 : : /*
112 : : * Unfortunately pthread_cond_timedwait doesn't support relative sleeps
113 : : * so we need to convert back to an absolute time.
114 : : */
115 : 116 : gettimeofday(&tp, NULL);
116 : 116 : wakeup = ((uint64_t)(tp.tv_sec) * 1000) + (tp.tv_usec / 1000) + ms;
117 : : /* Round up for sub ms */
118 [ + + ]: 116 : if ((tp.tv_usec % 1000) > 499) {
119 : 53 : ++wakeup;
120 : : }
121 : :
122 : 116 : ts.tv_sec = wakeup / 1000;
123 : 116 : wakeup %= 1000;
124 : 116 : ts.tv_nsec = wakeup * 1000000;
125 : 116 : return ts;
126 : : }
127 : : #endif
128 : :
129 : : #if !defined(WIN32) && !defined(_WIN32)
130 : 508 : static bool does_file_exist(const char *filename) {
131 : : struct stat st;
132 : 508 : int result = stat(filename, &st);
133 : 508 : return result == 0;
134 : : }
135 : : #else
136 : : static bool does_file_exist(const char *filename) {
137 : : return GetFileAttributes(filename) != INVALID_FILE_ATTRIBUTES;
138 : : }
139 : : #endif
140 : :
141 : : // compares file names
142 : 151 : int _compactor_cmp(struct avl_node *a, struct avl_node *b, void *aux)
143 : : {
144 : : struct openfiles_elem *aa, *bb;
145 : 151 : struct compactor_args_t *args = (struct compactor_args_t *)aux;
146 : 151 : aa = _get_entry(a, struct openfiles_elem, avl);
147 : 151 : bb = _get_entry(b, struct openfiles_elem, avl);
148 : 151 : return strncmp(aa->file->filename, bb->file->filename, args->strcmp_len);
149 : : }
150 : :
151 : 24 : INLINE uint64_t _compactor_estimate_space(struct openfiles_elem *elem)
152 : : {
153 : 24 : uint64_t ret = 0;
154 : : uint64_t datasize;
155 : : uint64_t nlivenodes;
156 : :
157 : 24 : datasize = _kvs_stat_get_sum(elem->file, KVS_STAT_DATASIZE);
158 : 24 : nlivenodes = _kvs_stat_get_sum(elem->file, KVS_STAT_NLIVENODES);
159 : :
160 : 24 : ret = datasize;
161 : 24 : ret += nlivenodes * elem->config.blocksize;
162 : 24 : ret += wal_get_datasize(elem->file);
163 : :
164 : 24 : return ret;
165 : : }
166 : :
167 : : // check if the compaction threshold is satisfied
168 : 33 : INLINE int _compactor_is_threshold_satisfied(struct openfiles_elem *elem)
169 : : {
170 : : uint64_t filesize;
171 : : uint64_t active_data;
172 : : int threshold;
173 : :
174 [ - + ]: 33 : if (filemgr_is_rollback_on(elem->file)) {
175 : : // do not perform compaction during rollback
176 : 0 : return 0;
177 : : }
178 : :
179 : 33 : threshold = elem->config.compaction_threshold;
180 [ + - ][ + + ]: 33 : if (elem->config.compaction_mode == FDB_COMPACTION_AUTO &&
[ + - ]
181 : 24 : threshold > 0 && !elem->compaction_flag)
182 : : {
183 : 24 : filesize = filemgr_get_pos(elem->file);
184 : 24 : active_data = _compactor_estimate_space(elem);
185 [ + - ][ + - ]: 24 : if (active_data == 0 || active_data >= filesize ||
[ + + ]
186 : : filesize < elem->config.compaction_minimum_filesize) {
187 : 9 : return 0;
188 : : }
189 : :
190 : 15 : return ((filesize / 100.0 * threshold) < (filesize - active_data));
191 : : } else {
192 : 33 : return 0;
193 : : }
194 : : }
195 : :
196 : : // return the location of '.'
197 : 73 : INLINE int _compactor_prefix_len(char *filename)
198 : : {
199 : : int i;
200 : 73 : int file_len = strlen(filename);
201 : 73 : int prefix_len = 0;
202 : : // find the first '.'
203 [ + + ]: 223 : for (i=file_len-1; i>=0; --i){
204 [ + + ]: 222 : if (filename[i] == '.') {
205 : 72 : prefix_len = i+1;
206 : 72 : break;
207 : : }
208 : : }
209 : 73 : return prefix_len;
210 : : }
211 : :
212 : 13 : static void _compactor_get_vfilename(char *filename, char *vfilename)
213 : : {
214 : 13 : int prefix_len = _compactor_prefix_len(filename);
215 : :
216 [ + - ]: 13 : if (prefix_len > 0) {
217 : 13 : strncpy(vfilename, filename, prefix_len-1);
218 : 13 : vfilename[prefix_len-1] = 0;
219 : : }
220 : 13 : }
221 : :
222 : 39 : static void _compactor_convert_dbfile_to_metafile(char *dbfile, char *metafile)
223 : : {
224 : 39 : int prefix_len = _compactor_prefix_len(dbfile);
225 : :
226 [ + - ]: 39 : if (prefix_len > 0) {
227 : 39 : strncpy(metafile, dbfile, prefix_len);
228 : 39 : metafile[prefix_len] = 0;
229 : 39 : strcat(metafile, "meta");
230 : : }
231 : 39 : }
232 : :
233 : 20 : static bool _allDigit(char *str) {
234 : 20 : int numchar = strlen(str);
235 [ + + ]: 43 : for(int i = 0; i < numchar; ++i) {
236 [ + + ][ + + ]: 28 : if (str[i] < '0' || str[i] > '9') {
237 : 5 : return false;
238 : : }
239 : : }
240 : 20 : return true;
241 : : }
242 : :
243 : 21 : void compactor_get_next_filename(char *file, char *nextfile)
244 : : {
245 : 21 : int compaction_no = 0;
246 : 21 : int prefix_len = _compactor_prefix_len(file);
247 : : char str_no[24];
248 : :
249 [ + + ][ + + ]: 21 : if (prefix_len > 0 && _allDigit(file + prefix_len)) {
[ + + ]
250 : 15 : sscanf(file+prefix_len, "%d", &compaction_no);
251 : 15 : strncpy(nextfile, file, prefix_len);
252 [ - + ]: 15 : do {
253 : 15 : nextfile[prefix_len] = 0;
254 : 15 : sprintf(str_no, "%d", ++compaction_no);
255 : 15 : strcat(nextfile, str_no);
256 : : } while (does_file_exist(nextfile));
257 : : } else {
258 [ + + ]: 7 : do {
259 : 7 : strcpy(nextfile, file);
260 : 7 : sprintf(str_no, ".%d", ++compaction_no);
261 : 7 : strcat(nextfile, str_no);
262 : : } while (does_file_exist(nextfile));
263 : : }
264 : 21 : }
265 : :
266 : 3 : bool compactor_switch_compaction_flag(struct filemgr *file, bool flag)
267 : : {
268 : 3 : struct avl_node *a = NULL;
269 : : struct openfiles_elem query, *elem;
270 : :
271 : 3 : spin_lock(&cpt_lock);
272 : 3 : query.file = file;
273 : 3 : a = avl_search(&openfiles, &query.avl, _compactor_cmp);
274 [ + - ]: 3 : if (a) {
275 : : // found
276 : 3 : elem = _get_entry(a, struct openfiles_elem, avl);
277 [ + + ]: 3 : if (elem->compaction_flag == flag) {
278 : : // already switched by other thread .. return false
279 : 1 : spin_unlock(&cpt_lock);
280 : 1 : return false;
281 : : }
282 : : // switch
283 : 2 : elem->compaction_flag = flag;
284 : 2 : spin_unlock(&cpt_lock);
285 : 2 : return true;
286 : : }
287 : : // file doesn't exist .. already compacted or deregistered
288 : 0 : spin_unlock(&cpt_lock);
289 : 3 : return false;
290 : : }
291 : :
292 : 105 : void * compactor_thread(void *voidargs)
293 : : {
294 : : char filename[MAX_FNAMELEN];
295 : : char vfilename[MAX_FNAMELEN];
296 : : char new_filename[MAX_FNAMELEN];
297 : : fdb_file_handle *fhandle;
298 : : fdb_config config;
299 : : fdb_status fs;
300 : : struct avl_node *a;
301 : : struct openfiles_elem *elem, *target;
302 : :
303 : : // Sleep for 10 secs by default to allow applications to warm up their data.
304 : : // TODO: Need to implement more flexible way of scheduling the compaction
305 : : // daemon (e.g., public APIs to start / stop the compaction daemon).
306 : 105 : mutex_lock(&sync_mutex);
307 : 105 : thread_cond_timedwait(&sync_cond, &sync_mutex, sleep_duration * 1000);
308 : 105 : mutex_unlock(&sync_mutex);
309 : :
310 : 8 : while (1) {
311 : 113 : target = NULL;
312 : :
313 : 113 : spin_lock(&cpt_lock);
314 : 113 : a = avl_first(&openfiles);
315 [ + + ]: 141 : while(a) {
316 : 33 : elem = _get_entry(a, struct openfiles_elem, avl);
317 : :
318 [ + + ]: 33 : if (_compactor_is_threshold_satisfied(elem)) {
319 : : // perform compaction
320 : 13 : strcpy(filename, elem->file->filename);
321 : 13 : _compactor_get_vfilename(filename, vfilename);
322 : 13 : config = elem->config;
323 : 13 : compactor_status = CPT_WORKING;
324 : : // set target_cursor to avoid deregistering of the 'elem'
325 : 13 : target_cursor = &elem->avl;
326 : : // set compaction flag
327 : 13 : elem->compaction_flag = true;
328 : 13 : spin_unlock(&cpt_lock);
329 : :
330 : 13 : fs = fdb_open_for_compactor(&fhandle, vfilename, &config);
331 [ + - ]: 13 : if (fs == FDB_RESULT_SUCCESS) {
332 : 13 : compactor_get_next_filename(filename, new_filename);
333 : 13 : fdb_compact_file(fhandle, new_filename, false);
334 : :
335 : 13 : spin_lock(&cpt_lock);
336 : 13 : a = avl_next(target_cursor);
337 : : // we have to set cursor to NULL before fdb_close
338 : 13 : target_cursor = NULL;
339 : 13 : spin_unlock(&cpt_lock);
340 : :
341 : 13 : fdb_close(fhandle);
342 : :
343 : 13 : spin_lock(&cpt_lock);
344 : 13 : compactor_status = CPT_IDLE;
345 : : } else {
346 : : // fail to open file
347 : 0 : spin_lock(&cpt_lock);
348 : 0 : compactor_status = CPT_IDLE;
349 : 0 : a = avl_next(target_cursor);
350 : 0 : target_cursor = NULL;
351 : : // clear compaction flag
352 : 0 : elem->compaction_flag = false;
353 : : }
354 : : } else {
355 : 20 : a = avl_next(a);
356 : : }
357 [ + + ]: 33 : if (compactor_terminate_signal) {
358 : 5 : spin_unlock(&cpt_lock);
359 : 5 : return NULL;
360 : : }
361 : : }
362 : 108 : spin_unlock(&cpt_lock);
363 : :
364 : 108 : mutex_lock(&sync_mutex);
365 [ + + ]: 108 : if (compactor_terminate_signal) {
366 : 97 : mutex_unlock(&sync_mutex);
367 : 97 : break;
368 : : }
369 : 11 : thread_cond_timedwait(&sync_cond, &sync_mutex, sleep_duration * 1000);
370 [ + + ]: 11 : if (compactor_terminate_signal) {
371 : 3 : mutex_unlock(&sync_mutex);
372 : 3 : break;
373 : : }
374 : 8 : mutex_unlock(&sync_mutex);
375 : : }
376 : 105 : return NULL;
377 : : }
378 : :
379 : 106 : void compactor_init(struct compactor_config *config)
380 : : {
381 [ + + ]: 106 : if (!compactor_initialized) {
382 : : #ifndef SPIN_INITIALIZER
383 : : // Note that only Windows passes through this routine
384 : : if (InterlockedCompareExchange(&init_lock_status, 1, 0) == 0) {
385 : : // atomically initialize spin lock only once
386 : : spin_init(&cpt_lock);
387 : : init_lock_status = 2;
388 : : } else {
389 : : // the others .. wait until initializing 'cpt_lock' is done
390 : : while (init_lock_status != 2) {
391 : : Sleep(1);
392 : : }
393 : : }
394 : : #endif
395 : :
396 : 105 : spin_lock(&cpt_lock);
397 [ + - ]: 105 : if (!compactor_initialized) {
398 : : // initialize
399 : 105 : compactor_args.strcmp_len = MAX_FNAMELEN;
400 : 105 : avl_init(&openfiles, &compactor_args);
401 : 105 : target_cursor = NULL;
402 : :
403 [ + - ]: 105 : if (config) {
404 [ + - ]: 105 : if (config->sleep_duration > 0) {
405 : 105 : sleep_duration = config->sleep_duration;
406 : : }
407 : : }
408 : :
409 : 105 : compactor_status = CPT_IDLE;
410 : 105 : compactor_terminate_signal = 0;
411 : :
412 : 105 : mutex_init(&sync_mutex);
413 : 105 : thread_cond_init(&sync_cond);
414 : :
415 : : // create worker thread
416 : 105 : thread_create(&compactor_tid, compactor_thread, NULL);
417 : :
418 : 105 : compactor_initialized = 1;
419 : : }
420 : 105 : spin_unlock(&cpt_lock);
421 : : }
422 : 106 : }
423 : :
424 : 105 : void compactor_shutdown()
425 : : {
426 : : void *ret;
427 : 105 : struct avl_node *a = NULL;
428 : : struct openfiles_elem *elem;
429 : :
430 : : // set terminate signal
431 : 105 : mutex_lock(&sync_mutex);
432 : 105 : compactor_terminate_signal = 1;
433 : 105 : thread_cond_signal(&sync_cond);
434 : 105 : mutex_unlock(&sync_mutex);
435 : :
436 : 105 : thread_join(compactor_tid, &ret);
437 : :
438 : 105 : spin_lock(&cpt_lock);
439 : : // free all elems in the tree
440 : 105 : a = avl_first(&openfiles);
441 [ - + ]: 105 : while (a) {
442 : 0 : elem = _get_entry(a, struct openfiles_elem, avl);
443 : 0 : a = avl_next(a);
444 : :
445 : 0 : avl_remove(&openfiles, &elem->avl);
446 : 0 : free(elem);
447 : : }
448 : :
449 : 105 : sleep_duration = FDB_COMPACTOR_SLEEP_DURATION;
450 : 105 : compactor_initialized = 0;
451 : 105 : mutex_destroy(&sync_mutex);
452 : 105 : thread_cond_destroy(&sync_cond);
453 : 105 : spin_unlock(&cpt_lock);
454 : :
455 : : #ifndef SPIN_INITIALIZER
456 : : spin_destroy(&cpt_lock);
457 : : init_lock_status = 0;
458 : : #else
459 : 105 : cpt_lock = SPIN_INITIALIZER;
460 : : #endif
461 : 105 : }
462 : :
463 : : fdb_status _compactor_store_metafile(char *metafile,
464 : : struct compactor_meta *metadata);
465 : 41 : fdb_status compactor_register_file(struct filemgr *file, fdb_config *config)
466 : : {
467 : 41 : fdb_status fs = FDB_RESULT_SUCCESS;
468 : 41 : struct avl_node *a = NULL;
469 : : struct openfiles_elem query, *elem;
470 : :
471 : : // first search the existing file
472 : 41 : spin_lock(&cpt_lock);
473 : 41 : query.file = file;
474 : 41 : a = avl_search(&openfiles, &query.avl, _compactor_cmp);
475 [ + + ]: 41 : if (a == NULL) {
476 : : // doesn't exist
477 : : // create elem and insert into tree
478 : : char path[MAX_FNAMELEN];
479 : : struct compactor_meta meta;
480 : :
481 : 25 : elem = (struct openfiles_elem *)malloc(sizeof(struct openfiles_elem));
482 : 25 : elem->file = file;
483 : 25 : elem->config = *config;
484 : 25 : elem->register_count = 1;
485 : 25 : elem->compaction_flag = false;
486 : 25 : avl_insert(&openfiles, &elem->avl, _compactor_cmp);
487 : :
488 : : // store in metafile
489 : 25 : _compactor_convert_dbfile_to_metafile(file->filename, path);
490 : 25 : strcpy(meta.filename, file->filename);
491 : 25 : fs = _compactor_store_metafile(path, &meta);
492 : : } else {
493 : : // already exists
494 : 16 : elem = _get_entry(a, struct openfiles_elem, avl);
495 : 16 : elem->register_count++;
496 : : }
497 : 41 : spin_unlock(&cpt_lock);
498 : 41 : return fs;
499 : : }
500 : :
501 : 34 : void compactor_deregister_file(struct filemgr *file)
502 : : {
503 : 34 : struct avl_node *a = NULL;
504 : : struct openfiles_elem query, *elem;
505 : :
506 : 34 : spin_lock(&cpt_lock);
507 : 34 : query.file = file;
508 : 34 : a = avl_search(&openfiles, &query.avl, _compactor_cmp);
509 [ + + ]: 34 : if (a) {
510 : 33 : elem = _get_entry(a, struct openfiles_elem, avl);
511 [ + + ]: 33 : if ((--elem->register_count) == 0) {
512 : : // if no handle refers this file
513 [ + - ]: 25 : if (target_cursor == &elem->avl) {
514 : : // This file is waiting for compaction by compactor (but not opened
515 : : // yet). Do not remove 'elem' for now. The 'elem' will be automatically
516 : : // replaced after the compaction is done by calling
517 : : // 'compactor_switch_file()'.
518 : : } else {
519 : : // remove from the tree
520 : 25 : avl_remove(&openfiles, &elem->avl);
521 : 25 : free(elem);
522 : : }
523 : : }
524 : : }
525 : 34 : spin_unlock(&cpt_lock);
526 : 34 : }
527 : :
528 : 1 : void compactor_change_threshold(struct filemgr *file, size_t new_threshold)
529 : : {
530 : 1 : struct avl_node *a = NULL;
531 : : struct openfiles_elem query, *elem;
532 : :
533 : 1 : spin_lock(&cpt_lock);
534 : 1 : query.file = file;
535 : 1 : a = avl_search(&openfiles, &query.avl, _compactor_cmp);
536 [ + - ]: 1 : if (a) {
537 : 1 : elem = _get_entry(a, struct openfiles_elem, avl);
538 : 1 : elem->config.compaction_threshold = new_threshold;
539 : : }
540 : 1 : spin_unlock(&cpt_lock);
541 : 1 : }
542 : :
543 : 531 : struct compactor_meta * _compactor_read_metafile(char *metafile,
544 : : struct compactor_meta *metadata)
545 : : {
546 : : int fd_meta, fd_db;
547 : : ssize_t ret;
548 : 531 : uint8_t *buf = alca(uint8_t, sizeof(struct compactor_meta));
549 : : uint32_t crc;
550 : : struct filemgr_ops *ops;
551 : : struct compactor_meta meta;
552 : :
553 : 531 : ops = get_filemgr_ops();
554 : 530 : fd_meta = ops->open(metafile, O_RDONLY, 0644);
555 : :
556 [ + + ]: 532 : if (fd_meta >= 0) {
557 : : // metafile exists .. read metadata
558 : 35 : ret = ops->pread(fd_meta, buf, sizeof(struct compactor_meta), 0);
559 [ - + ]: 35 : if (ret < sizeof(struct compactor_meta)) {
560 : 0 : ops->close(fd_meta);
561 : 0 : return NULL;
562 : : }
563 : 35 : memcpy(&meta, buf, sizeof(struct compactor_meta));
564 : 35 : meta.version = _endian_decode(meta.version);
565 : 35 : meta.crc = _endian_decode(meta.crc);
566 : 35 : ops->close(fd_meta);
567 : :
568 : : // CRC check
569 : 35 : crc = chksum(buf, sizeof(struct compactor_meta) - sizeof(crc));
570 [ - + ]: 35 : if (crc != meta.crc) {
571 : 0 : return NULL;
572 : : }
573 : : // check if the file exists
574 : 35 : fd_db = ops->open(meta.filename, O_RDONLY, 0644);
575 [ + + ]: 35 : if (fd_db < 0) {
576 : : // file doesn't exist
577 : 1 : return NULL;
578 : : }
579 : 34 : ops->close(fd_db);
580 : : } else {
581 : : // file doesn't exist
582 : 532 : return NULL;
583 : : }
584 : :
585 : 34 : *metadata = meta;
586 : 34 : return metadata;
587 : : }
588 : :
589 : 39 : fdb_status _compactor_store_metafile(char *metafile,
590 : : struct compactor_meta *metadata)
591 : : {
592 : : int fd_meta;
593 : : ssize_t ret;
594 : : uint32_t crc;
595 : : struct filemgr_ops *ops;
596 : : struct compactor_meta meta;
597 : :
598 : 39 : ops = get_filemgr_ops();
599 : 39 : fd_meta = ops->open(metafile, O_RDWR | O_CREAT, 0644);
600 : :
601 [ + - ]: 39 : if (fd_meta >= 0){
602 : 39 : meta.version = _endian_encode(COMPACTOR_META_VERSION);
603 : 39 : strcpy(meta.filename, metadata->filename);
604 : 39 : crc = chksum((void*)&meta, sizeof(struct compactor_meta) - sizeof(crc));
605 : 39 : meta.crc = _endian_encode(crc);
606 : :
607 : 39 : ret = ops->pwrite(fd_meta, &meta, sizeof(struct compactor_meta), 0);
608 : 39 : ops->fsync(fd_meta);
609 : 39 : ops->close(fd_meta);
610 [ - + ]: 39 : if (ret < sizeof(struct compactor_meta)) {
611 : 0 : return FDB_RESULT_WRITE_FAIL;
612 : : }
613 : : } else {
614 : 0 : return FDB_RESULT_OPEN_FAIL;
615 : : }
616 : :
617 : 39 : return FDB_RESULT_SUCCESS;
618 : : }
619 : :
620 : 53 : void compactor_switch_file(struct filemgr *old_file, struct filemgr *new_file)
621 : : {
622 : 53 : struct avl_node *a = NULL;
623 : : struct openfiles_elem query, *elem;
624 : : struct compactor_meta meta;
625 : :
626 : 53 : spin_lock(&cpt_lock);
627 : 53 : query.file = old_file;
628 : 53 : a = avl_search(&openfiles, &query.avl, _compactor_cmp);
629 [ + + ]: 53 : if (a) {
630 : : char metafile[MAX_FNAMELEN];
631 : :
632 : 14 : elem = _get_entry(a, struct openfiles_elem, avl);
633 : 14 : avl_remove(&openfiles, a);
634 : 14 : elem->file = new_file;
635 : 14 : elem->register_count = 1;
636 : : // clear compaction flag
637 : 14 : elem->compaction_flag = false;
638 : 14 : avl_insert(&openfiles, &elem->avl, _compactor_cmp);
639 : :
640 [ + - ]: 14 : if (elem->config.compaction_mode == FDB_COMPACTION_AUTO) {
641 : 14 : _compactor_convert_dbfile_to_metafile(new_file->filename, metafile);
642 : 14 : strcpy(meta.filename, new_file->filename);
643 : 14 : _compactor_store_metafile(metafile, &meta);
644 : : }
645 : 14 : spin_unlock(&cpt_lock);
646 : :
647 : : } else {
648 : 39 : spin_unlock(&cpt_lock);
649 : : }
650 : 53 : }
651 : :
652 : 530 : fdb_status compactor_get_actual_filename(const char *filename,
653 : : char *actual_filename,
654 : : fdb_compaction_mode_t comp_mode)
655 : : {
656 : : int i;
657 : : int filename_len;
658 : : int dirname_len;
659 : 530 : int compaction_no, max_compaction_no = -1;
660 : : char path[MAX_FNAMELEN];
661 : : char dirname[MAX_FNAMELEN], prefix[MAX_FNAMELEN];
662 : 530 : fdb_status fs = FDB_RESULT_SUCCESS;
663 : : struct compactor_meta meta, *meta_ptr;
664 : :
665 : : // get actual filename from metafile
666 : 530 : sprintf(path, "%s.meta", filename);
667 : 530 : meta_ptr = _compactor_read_metafile(path, &meta);
668 : :
669 [ + + ]: 531 : if (meta_ptr == NULL) {
670 [ + + ][ + + ]: 497 : if (comp_mode == FDB_COMPACTION_MANUAL && does_file_exist(filename)) {
[ + + ]
671 : 373 : strcpy(actual_filename, filename);
672 : 373 : return FDB_RESULT_SUCCESS;
673 : : }
674 : :
675 : : // error handling .. scan directory
676 : : // backward search until find the first '/' or '\' (Windows)
677 : 124 : filename_len = strlen(filename);
678 : 124 : dirname_len = 0;
679 : :
680 : : #if !defined(WIN32) && !defined(_WIN32)
681 : : DIR *dir_info;
682 : : struct dirent *dir_entry;
683 : :
684 [ + + ]: 968 : for (i=filename_len-1; i>=0; --i){
685 [ + + ]: 950 : if (filename[i] == '/') {
686 : 106 : dirname_len = i+1;
687 : 106 : break;
688 : : }
689 : : }
690 : :
691 [ + + ]: 124 : if (dirname_len > 0) {
692 : 105 : strncpy(dirname, filename, dirname_len);
693 : 105 : dirname[dirname_len] = 0;
694 : : } else {
695 : 19 : strcpy(dirname, ".");
696 : : }
697 : 124 : strcpy(prefix, filename + dirname_len);
698 : 124 : strcat(prefix, ".");
699 : :
700 : 124 : dir_info = opendir(dirname);
701 [ + - ]: 125 : if (dir_info != NULL) {
702 [ + + ]: 3044 : while ((dir_entry = readdir(dir_info))) {
703 [ + + ]: 2919 : if (!strncmp(dir_entry->d_name, prefix, strlen(prefix))) {
704 : 5 : compaction_no = -1;
705 : 5 : sscanf(dir_entry->d_name + strlen(prefix), "%d", &compaction_no);
706 [ + + ]: 5 : if (compaction_no >= 0) {
707 [ + - ]: 4 : if (compaction_no > max_compaction_no) {
708 : 4 : max_compaction_no = compaction_no;
709 : : }
710 : : }
711 : : }
712 : : }
713 : 125 : closedir(dir_info);
714 : : }
715 : : #else
716 : : // Windows
717 : : for (i=filename_len-1; i>=0; --i){
718 : : if (filename[i] == '/' || filename[i] == '\\') {
719 : : dirname_len = i+1;
720 : : break;
721 : : }
722 : : }
723 : :
724 : : if (dirname_len > 0) {
725 : : strncpy(dirname, filename, dirname_len);
726 : : dirname[dirname_len] = 0;
727 : : } else {
728 : : strcpy(dirname, ".");
729 : : }
730 : : strcpy(prefix, filename + dirname_len);
731 : : strcat(prefix, ".");
732 : :
733 : : WIN32_FIND_DATA filedata;
734 : : HANDLE hfind;
735 : : char query_str[MAX_FNAMELEN];
736 : :
737 : : // find all files start with 'prefix'
738 : : sprintf(query_str, "%s*", prefix);
739 : : hfind = FindFirstFile(query_str, &filedata);
740 : : while (hfind != INVALID_HANDLE_VALUE) {
741 : : if (!strncmp(filedata.cFileName, prefix, strlen(prefix))) {
742 : : compaction_no = -1;
743 : : sscanf(filedata.cFileName + strlen(prefix), "%d", &compaction_no);
744 : : if (compaction_no >= 0) {
745 : : if (compaction_no > max_compaction_no) {
746 : : max_compaction_no = compaction_no;
747 : : }
748 : : }
749 : : }
750 : :
751 : : if (!FindNextFile(hfind, &filedata)) {
752 : : FindClose(hfind);
753 : : hfind = INVALID_HANDLE_VALUE;
754 : : }
755 : : }
756 : :
757 : : #endif
758 : :
759 [ + + ]: 125 : if (max_compaction_no < 0) {
760 [ + + ]: 121 : if (comp_mode == FDB_COMPACTION_AUTO) {
761 : : // DB files with a revision number are not found.
762 : : // update metadata's filename to '[filename].0'
763 : 9 : sprintf(meta.filename, "%s.0", filename);
764 : : } else { // Manual compaction mode.
765 : : // Simply use the file name passed to this function.
766 : 112 : strcpy(actual_filename, filename);
767 : 112 : return FDB_RESULT_SUCCESS;
768 : : }
769 : : } else {
770 : : // return the file that has the largest compaction number
771 : 4 : sprintf(meta.filename, "%s.%d", filename, max_compaction_no);
772 : 4 : fs = FDB_RESULT_SUCCESS;
773 : : }
774 [ + - ]: 13 : if (fs == FDB_RESULT_SUCCESS) {
775 : 13 : strcpy(actual_filename, meta.filename);
776 : : }
777 : 13 : return fs;
778 : :
779 : : } else {
780 : : // metadata is successfully read from the metafile .. just return the filename
781 : 34 : strcpy(actual_filename, meta.filename);
782 : 532 : return FDB_RESULT_SUCCESS;
783 : : }
784 : : }
785 : :
786 : 541 : bool compactor_is_valid_mode(const char *filename, fdb_config *config)
787 : : {
788 : : int fd;
789 : : char path[MAX_FNAMELEN];
790 : : struct filemgr_ops *ops;
791 : :
792 : 541 : ops = get_filemgr_ops();
793 : :
794 [ + + ]: 541 : if (config->compaction_mode == FDB_COMPACTION_AUTO) {
795 : : // auto compaction mode: invalid when
796 : : // the file '[filename]' exists
797 : 49 : fd = ops->open(filename, O_RDONLY, 0644);
798 [ + + ]: 48 : if (fd != FDB_RESULT_NO_SUCH_FILE) {
799 : 2 : ops->close(fd);
800 : 2 : return false;
801 : : }
802 : :
803 [ + - ]: 492 : } else if (config->compaction_mode == FDB_COMPACTION_MANUAL) {
804 : : // manual compaction mode: invalid when
805 : : // the file '[filename].meta' exists
806 : 492 : sprintf(path, "%s.meta", filename);
807 : 492 : fd = ops->open(path, O_RDONLY, 0644);
808 [ + + ]: 491 : if (fd != FDB_RESULT_NO_SUCH_FILE) {
809 : 2 : ops->close(fd);
810 : 2 : return false;
811 : : }
812 : :
813 : : } else {
814 : : // unknown mode
815 : 0 : return false;
816 : : }
817 : :
818 : 539 : return true;
819 : : }
820 : :
821 : 1 : fdb_status _compactor_search_n_destroy(const char *filename)
822 : : {
823 : : int i;
824 : : int filename_len;
825 : : int dirname_len;
826 : : char dirname[MAX_FNAMELEN], prefix[MAX_FNAMELEN];
827 : 1 : fdb_status fs = FDB_RESULT_SUCCESS;
828 : :
829 : : // error handling .. scan directory
830 : : // backward search until find the first '/' or '\' (Windows)
831 : 1 : filename_len = strlen(filename);
832 : 1 : dirname_len = 0;
833 : :
834 : : #if !defined(WIN32) && !defined(_WIN32)
835 : : DIR *dir_info;
836 : : struct dirent *dir_entry;
837 : :
838 [ + + ]: 23 : for (i=filename_len-1; i>=0; --i){
839 [ - + ]: 22 : if (filename[i] == '/') {
840 : 0 : dirname_len = i+1;
841 : 0 : break;
842 : : }
843 : : }
844 : :
845 [ - + ]: 1 : if (dirname_len > 0) {
846 : 0 : strncpy(dirname, filename, dirname_len);
847 : 0 : dirname[dirname_len] = 0;
848 : : } else {
849 : 1 : strcpy(dirname, ".");
850 : : }
851 : 1 : strcpy(prefix, filename + dirname_len);
852 : 1 : strcat(prefix, ".");
853 : :
854 : 1 : dir_info = opendir(dirname);
855 [ + - ]: 1 : if (dir_info != NULL) {
856 [ + + ]: 31 : while ((dir_entry = readdir(dir_info))) {
857 [ + + ]: 30 : if (!strncmp(dir_entry->d_name, prefix, strlen(prefix))) {
858 : : // Need to check filemgr for possible open entry?
859 [ - + ]: 2 : if (remove(dir_entry->d_name)) {
860 : 0 : fs = FDB_RESULT_FILE_REMOVE_FAIL;
861 : 0 : closedir(dir_info);
862 : 0 : return fs;
863 : : }
864 : : }
865 : : }
866 : 1 : closedir(dir_info);
867 : : }
868 : : #else
869 : : // Windows
870 : : for (i=filename_len-1; i>=0; --i){
871 : : if (filename[i] == '/' || filename[i] == '\\') {
872 : : dirname_len = i+1;
873 : : break;
874 : : }
875 : : }
876 : :
877 : : if (dirname_len > 0) {
878 : : strncpy(dirname, filename, dirname_len);
879 : : dirname[dirname_len] = 0;
880 : : } else {
881 : : strcpy(dirname, ".");
882 : : }
883 : : strcpy(prefix, filename + dirname_len);
884 : : strcat(prefix, ".");
885 : :
886 : : WIN32_FIND_DATA filedata;
887 : : HANDLE hfind;
888 : : char query_str[MAX_FNAMELEN];
889 : :
890 : : // find all files start with 'prefix'
891 : : sprintf(query_str, "%s*", prefix);
892 : : hfind = FindFirstFile(query_str, &filedata);
893 : : while (hfind != INVALID_HANDLE_VALUE) {
894 : : if (!strncmp(filedata.cFileName, prefix, strlen(prefix))) {
895 : : // Need to check filemgr for possible open entry?
896 : : if (remove(filedata.cFileName)) {
897 : : fs = FDB_RESULT_FILE_REMOVE_FAIL;
898 : : FindClose(hfind);
899 : : hfind = INVALID_HANDLE_VALUE;
900 : : return fs;
901 : : }
902 : : }
903 : :
904 : : if (!FindNextFile(hfind, &filedata)) {
905 : : FindClose(hfind);
906 : : hfind = INVALID_HANDLE_VALUE;
907 : : }
908 : : }
909 : :
910 : : #endif
911 : 1 : return fs;
912 : : }
913 : :
914 : 1 : fdb_status compactor_destroy_file(char *filename,
915 : : fdb_config *config)
916 : : {
917 : 1 : struct avl_node *a = NULL;
918 : : struct openfiles_elem query, *elem;
919 : : struct filemgr query_file;
920 : 1 : struct filemgr *file = &query_file;
921 : : size_t strcmp_len;
922 : 1 : fdb_status status = FDB_RESULT_SUCCESS;
923 : : compactor_config c_config;
924 : :
925 : 1 : strcmp_len = strlen(filename);
926 : 1 : filename[strcmp_len] = '.'; // add a . suffix in place
927 : 1 : strcmp_len++;
928 : 1 : filename[strcmp_len] = '\0';
929 : 1 : file->filename = filename;
930 : :
931 : 1 : c_config.sleep_duration = config->compactor_sleep_duration;
932 : 1 : compactor_init(&c_config);
933 : :
934 : 1 : spin_lock(&cpt_lock); // TODO: use mutex as we are doing I/O
935 : 1 : query.file = file;
936 : 1 : compactor_args.strcmp_len = strcmp_len; // Do prefix match for all vers
937 : 1 : a = avl_search(&openfiles, &query.avl, _compactor_cmp);
938 [ - + ]: 1 : if (a) {
939 : 0 : elem = _get_entry(a, struct openfiles_elem, avl);
940 : : // if no handle refers this file
941 [ # # ]: 0 : if (target_cursor == &elem->avl) {
942 : : // This file is waiting for compaction by compactor
943 : : // Return a temporary failure, user must retry after sometime
944 : 0 : status = FDB_RESULT_IN_USE_BY_COMPACTOR;
945 : : } else { // File handle not closed, fail operation
946 : 0 : status = FDB_RESULT_FILE_IS_BUSY;
947 : : }
948 : : }
949 : 1 : compactor_args.strcmp_len = MAX_FNAMELEN; // restore for normal compare
950 : 1 : filename[strcmp_len - 1] = '\0'; // restore the filename
951 [ + - ]: 1 : if (status == FDB_RESULT_SUCCESS) {
952 : 1 : status = _compactor_search_n_destroy(file->filename);
953 : : }
954 : 1 : spin_unlock(&cpt_lock);
955 : :
956 : 1 : return status;
957 : : }
|