| 1 | /* |
|---|
| 2 | * (C) 2002 Clemson University and The University of Chicago |
|---|
| 3 | * |
|---|
| 4 | * See COPYING in top-level directory. |
|---|
| 5 | */ |
|---|
| 6 | |
|---|
| 7 | #include "dbpf-op-queue.h" |
|---|
| 8 | #include "gossip.h" |
|---|
| 9 | #include "pint-perf-counter.h" |
|---|
| 10 | #include "dbpf-sync.h" |
|---|
| 11 | #include "dbpf-thread.h" |
|---|
| 12 | |
|---|
| 13 | enum s_sync_context_e |
|---|
| 14 | { |
|---|
| 15 | COALESCE_CONTEXT_KEYVAL = 0, |
|---|
| 16 | COALESCE_CONTEXT_DSPACE = 1, |
|---|
| 17 | COALESCE_CONTEXT_LAST = 2 |
|---|
| 18 | }; |
|---|
| 19 | |
|---|
| 20 | static dbpf_sync_context_t |
|---|
| 21 | sync_array[COALESCE_CONTEXT_LAST][TROVE_MAX_CONTEXTS]; |
|---|
| 22 | |
|---|
| 23 | extern dbpf_op_queue_p dbpf_completion_queue_array[TROVE_MAX_CONTEXTS]; |
|---|
| 24 | extern gen_mutex_t dbpf_completion_queue_array_mutex[TROVE_MAX_CONTEXTS]; |
|---|
| 25 | extern pthread_cond_t dbpf_op_completed_cond; |
|---|
| 26 | |
|---|
| 27 | static int dbpf_sync_db( |
|---|
| 28 | DB * dbp, |
|---|
| 29 | enum s_sync_context_e sync_context_type, |
|---|
| 30 | dbpf_sync_context_t * sync_context) |
|---|
| 31 | { |
|---|
| 32 | int ret; |
|---|
| 33 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 34 | "[SYNC_COALESCE]:\tcoalesce %d sync start " |
|---|
| 35 | "in coalesce_queue:%d pending:%d\n", |
|---|
| 36 | sync_context_type, sync_context->coalesce_counter, |
|---|
| 37 | sync_context->sync_counter); |
|---|
| 38 | ret = dbp->sync(dbp, 0); |
|---|
| 39 | if(ret != 0) |
|---|
| 40 | { |
|---|
| 41 | gossip_err("db SYNC failed: %s\n", |
|---|
| 42 | db_strerror(ret)); |
|---|
| 43 | ret = -dbpf_db_error_to_trove_error(ret); |
|---|
| 44 | return ret; |
|---|
| 45 | } |
|---|
| 46 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 47 | "[SYNC_COALESCE]:\tcoalesce %d sync stop\n", |
|---|
| 48 | sync_context_type); |
|---|
| 49 | return 0; |
|---|
| 50 | } |
|---|
| 51 | |
|---|
| 52 | static int dbpf_sync_get_object_sync_context(enum dbpf_op_type type) |
|---|
| 53 | { |
|---|
| 54 | assert(DBPF_OP_IS_KEYVAL(type) || DBPF_OP_IS_DSPACE(type)); |
|---|
| 55 | |
|---|
| 56 | if(DBPF_OP_IS_KEYVAL(type)) |
|---|
| 57 | { |
|---|
| 58 | return COALESCE_CONTEXT_KEYVAL; |
|---|
| 59 | } |
|---|
| 60 | else |
|---|
| 61 | { |
|---|
| 62 | return COALESCE_CONTEXT_DSPACE; |
|---|
| 63 | } |
|---|
| 64 | } |
|---|
| 65 | |
|---|
| 66 | int dbpf_sync_context_init(int context_index) |
|---|
| 67 | { |
|---|
| 68 | int c; |
|---|
| 69 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 70 | "[SYNC_COALESCE]: dbpf_sync_context_init for " |
|---|
| 71 | "context %d called\n", |
|---|
| 72 | context_index); |
|---|
| 73 | for(c=0; c < COALESCE_CONTEXT_LAST; c++) |
|---|
| 74 | { |
|---|
| 75 | bzero(& sync_array[c][context_index], sizeof(dbpf_sync_context_t)); |
|---|
| 76 | |
|---|
| 77 | gen_mutex_init(&sync_array[c][context_index].mutex); |
|---|
| 78 | sync_array[c][context_index].sync_queue = dbpf_op_queue_new(); |
|---|
| 79 | } |
|---|
| 80 | |
|---|
| 81 | return 0; |
|---|
| 82 | } |
|---|
| 83 | |
|---|
| 84 | void dbpf_sync_context_destroy(int context_index) |
|---|
| 85 | { |
|---|
| 86 | int c; |
|---|
| 87 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 88 | "[SYNC_COALESCE]: dbpf_sync_context_destroy for " |
|---|
| 89 | "context %d called\n", |
|---|
| 90 | context_index); |
|---|
| 91 | for(c=0; c < COALESCE_CONTEXT_LAST; c++) |
|---|
| 92 | { |
|---|
| 93 | /* grab lock...should be the last one, since we are shutting down */ |
|---|
| 94 | gen_mutex_lock(&sync_array[c][context_index].mutex); |
|---|
| 95 | |
|---|
| 96 | /* we have to unlock the mutex before we can destroy it */ |
|---|
| 97 | gen_mutex_unlock(&sync_array[c][context_index].mutex); |
|---|
| 98 | |
|---|
| 99 | /* destroy the mutex */ |
|---|
| 100 | gen_mutex_destroy(&sync_array[c][context_index].mutex); |
|---|
| 101 | |
|---|
| 102 | /* cleanup the op queue */ |
|---|
| 103 | dbpf_op_queue_cleanup(sync_array[c][context_index].sync_queue); |
|---|
| 104 | } |
|---|
| 105 | } |
|---|
| 106 | |
|---|
| 107 | int dbpf_sync_coalesce(dbpf_queued_op_t *qop_p, int retcode, int * outcount) |
|---|
| 108 | { |
|---|
| 109 | |
|---|
| 110 | int ret = 0; |
|---|
| 111 | DB * dbp = NULL; |
|---|
| 112 | dbpf_sync_context_t * sync_context; |
|---|
| 113 | dbpf_queued_op_t *ready_op; |
|---|
| 114 | int sync_context_type; |
|---|
| 115 | struct dbpf_collection* coll = qop_p->op.coll_p; |
|---|
| 116 | int cid = qop_p->op.context_id; |
|---|
| 117 | |
|---|
| 118 | /* We want to set the state in all cases |
|---|
| 119 | */ |
|---|
| 120 | qop_p->state = retcode; |
|---|
| 121 | |
|---|
| 122 | if(!DBPF_OP_DOES_SYNC(qop_p->op.type)) |
|---|
| 123 | { |
|---|
| 124 | dbpf_queued_op_complete(qop_p, OP_COMPLETED); |
|---|
| 125 | (*outcount)++; |
|---|
| 126 | return 0; |
|---|
| 127 | } |
|---|
| 128 | |
|---|
| 129 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 130 | "[SYNC_COALESCE]: sync_coalesce called, " |
|---|
| 131 | "handle: %s, cid: %d\n", |
|---|
| 132 | PVFS_handle_to_str(qop_p->op.handle), cid); |
|---|
| 133 | |
|---|
| 134 | sync_context_type = dbpf_sync_get_object_sync_context(qop_p->op.type); |
|---|
| 135 | |
|---|
| 136 | if ( ! (qop_p->op.flags & TROVE_SYNC) ) { |
|---|
| 137 | /* |
|---|
| 138 | * No sync needed at all |
|---|
| 139 | */ |
|---|
| 140 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 141 | "[SYNC_COALESCE]: sync not needed, " |
|---|
| 142 | "moving to completion queue: %s\n", |
|---|
| 143 | PVFS_handle_to_str(qop_p->op.handle)); |
|---|
| 144 | dbpf_queued_op_complete(qop_p, OP_COMPLETED); |
|---|
| 145 | return 0; |
|---|
| 146 | } |
|---|
| 147 | |
|---|
| 148 | /* |
|---|
| 149 | * Now we know that this particular op is modifying |
|---|
| 150 | */ |
|---|
| 151 | sync_context = & sync_array[sync_context_type][cid]; |
|---|
| 152 | |
|---|
| 153 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 154 | "[SYNC_COALESCE]: sync_context: %p\n", sync_context); |
|---|
| 155 | |
|---|
| 156 | if( sync_context_type == COALESCE_CONTEXT_DSPACE ) |
|---|
| 157 | { |
|---|
| 158 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 159 | "[SYNC_COALESCE]: sync context type is DSPACE\n"); |
|---|
| 160 | dbp = qop_p->op.coll_p->ds_db; |
|---|
| 161 | } |
|---|
| 162 | else if( sync_context_type == COALESCE_CONTEXT_KEYVAL ) |
|---|
| 163 | { |
|---|
| 164 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 165 | "[SYNC_COALESCE]: sync context type is KEYVAL\n"); |
|---|
| 166 | dbp = qop_p->op.coll_p->keyval_db; |
|---|
| 167 | } |
|---|
| 168 | |
|---|
| 169 | if ( ! coll->meta_sync_enabled ) |
|---|
| 170 | { |
|---|
| 171 | int do_sync=0; |
|---|
| 172 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 173 | "[SYNC_COALESCE]: meta sync disabled, " |
|---|
| 174 | "moving operation to completion queue\n"); |
|---|
| 175 | |
|---|
| 176 | ret = dbpf_queued_op_complete(qop_p, OP_COMPLETED); |
|---|
| 177 | |
|---|
| 178 | /* |
|---|
| 179 | * Sync periodical if count < lw or if lw = 0 and count > hw |
|---|
| 180 | */ |
|---|
| 181 | gen_mutex_lock(&sync_context->mutex); |
|---|
| 182 | sync_context->coalesce_counter++; |
|---|
| 183 | if( (coll->c_high_watermark > 0 && |
|---|
| 184 | sync_context->coalesce_counter >= coll->c_high_watermark) |
|---|
| 185 | || sync_context->sync_counter < coll->c_low_watermark ) |
|---|
| 186 | { |
|---|
| 187 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 188 | "[SYNC_COALESCE]:\thigh or low watermark reached:\n" |
|---|
| 189 | "\t\tcoalesced: %d\n\t\tqueued: %d\n", |
|---|
| 190 | sync_context->coalesce_counter, |
|---|
| 191 | sync_context->sync_counter); |
|---|
| 192 | |
|---|
| 193 | sync_context->coalesce_counter = 0; |
|---|
| 194 | do_sync = 1; |
|---|
| 195 | } |
|---|
| 196 | gen_mutex_unlock(&sync_context->mutex); |
|---|
| 197 | |
|---|
| 198 | if ( do_sync ) { |
|---|
| 199 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 200 | "[SYNC_COALESCE]: syncing now!\n"); |
|---|
| 201 | ret = dbpf_sync_db(dbp, sync_context_type, sync_context); |
|---|
| 202 | } |
|---|
| 203 | |
|---|
| 204 | return ret; |
|---|
| 205 | } |
|---|
| 206 | |
|---|
| 207 | /* |
|---|
| 208 | * metadata sync is enabled, either we delay and enqueue this op or we |
|---|
| 209 | * coalesce. |
|---|
| 210 | */ |
|---|
| 211 | gen_mutex_lock(&sync_context->mutex); |
|---|
| 212 | if( (sync_context->sync_counter < coll->c_low_watermark) || |
|---|
| 213 | ( coll->c_high_watermark > 0 && |
|---|
| 214 | sync_context->coalesce_counter >= coll->c_high_watermark ) ) |
|---|
| 215 | { |
|---|
| 216 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 217 | "[SYNC_COALESCE]:\thigh or low watermark reached:\n" |
|---|
| 218 | "\t\tcoalesced: %d\n\t\tqueued: %d\n", |
|---|
| 219 | sync_context->coalesce_counter, |
|---|
| 220 | sync_context->sync_counter); |
|---|
| 221 | |
|---|
| 222 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 223 | "[SYNC_COALESCE]: syncing now!\n"); |
|---|
| 224 | ret = dbpf_sync_db(dbp, sync_context_type, sync_context); |
|---|
| 225 | |
|---|
| 226 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 227 | "[SYNC_COALESCE]: moving op: %p, handle: %s , type: %d " |
|---|
| 228 | "to completion queue\n", |
|---|
| 229 | qop_p, PVFS_handle_to_str(qop_p->op.handle), qop_p->op.type); |
|---|
| 230 | |
|---|
| 231 | if(qop_p->event_type == trove_dbpf_dspace_create_event_id) |
|---|
| 232 | { |
|---|
| 233 | PINT_EVENT_END(qop_p->event_type, dbpf_pid, NULL, qop_p->event_id, |
|---|
| 234 | qop_p->op.u.d_create.out_handle_p); |
|---|
| 235 | } |
|---|
| 236 | else |
|---|
| 237 | { |
|---|
| 238 | PINT_EVENT_END(qop_p->event_type, dbpf_pid, NULL, qop_p->event_id); |
|---|
| 239 | } |
|---|
| 240 | |
|---|
| 241 | DBPF_COMPLETION_START(qop_p, OP_COMPLETED); |
|---|
| 242 | (*outcount)++; |
|---|
| 243 | |
|---|
| 244 | |
|---|
| 245 | /* move remaining ops in queue with ready-to-be-synced state |
|---|
| 246 | * to completion queue |
|---|
| 247 | */ |
|---|
| 248 | while(!dbpf_op_queue_empty(sync_context->sync_queue)) |
|---|
| 249 | { |
|---|
| 250 | ready_op = dbpf_op_queue_shownext(sync_context->sync_queue); |
|---|
| 251 | |
|---|
| 252 | if(ready_op->event_type == trove_dbpf_dspace_create_event_id) |
|---|
| 253 | { |
|---|
| 254 | PINT_EVENT_END(ready_op->event_type, dbpf_pid, NULL, ready_op->event_id, |
|---|
| 255 | ready_op->op.u.d_create.out_handle_p); |
|---|
| 256 | } |
|---|
| 257 | else |
|---|
| 258 | { |
|---|
| 259 | PINT_EVENT_END(ready_op->event_type, dbpf_pid, NULL, ready_op->event_id); |
|---|
| 260 | } |
|---|
| 261 | |
|---|
| 262 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 263 | "[SYNC_COALESCE]: moving op: %p, handle: %s , type: %d " |
|---|
| 264 | "to completion queue\n", |
|---|
| 265 | ready_op, PVFS_handle_to_str(ready_op->op.handle), ready_op->op.type); |
|---|
| 266 | |
|---|
| 267 | dbpf_op_queue_remove(ready_op); |
|---|
| 268 | DBPF_COMPLETION_ADD(ready_op, OP_COMPLETED); |
|---|
| 269 | (*outcount)++; |
|---|
| 270 | } |
|---|
| 271 | |
|---|
| 272 | sync_context->coalesce_counter = 0; |
|---|
| 273 | DBPF_COMPLETION_SIGNAL(); |
|---|
| 274 | DBPF_COMPLETION_FINISH(cid); |
|---|
| 275 | ret = 1; |
|---|
| 276 | } |
|---|
| 277 | else |
|---|
| 278 | { |
|---|
| 279 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 280 | "[SYNC_COALESCE]:\tcoalescing type: %d " |
|---|
| 281 | "coalesce_counter: %d, sync_counter: %d, handle %s\n", |
|---|
| 282 | sync_context_type, sync_context->coalesce_counter, |
|---|
| 283 | sync_context->sync_counter, |
|---|
| 284 | PVFS_handle_to_str(qop_p->op.handle)); |
|---|
| 285 | |
|---|
| 286 | dbpf_op_queue_add( |
|---|
| 287 | sync_context->sync_queue, qop_p); |
|---|
| 288 | sync_context->coalesce_counter++; |
|---|
| 289 | ret = 0; |
|---|
| 290 | } |
|---|
| 291 | |
|---|
| 292 | gen_mutex_unlock(&sync_context->mutex); |
|---|
| 293 | return ret; |
|---|
| 294 | } |
|---|
| 295 | |
|---|
| 296 | int dbpf_sync_coalesce_enqueue(dbpf_queued_op_t *qop_p) |
|---|
| 297 | { |
|---|
| 298 | dbpf_sync_context_t * sync_context; |
|---|
| 299 | int sync_context_type; |
|---|
| 300 | |
|---|
| 301 | if (!DBPF_OP_DOES_SYNC(qop_p->op.type)) |
|---|
| 302 | { |
|---|
| 303 | return 0; |
|---|
| 304 | } |
|---|
| 305 | |
|---|
| 306 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 307 | "[SYNC_COALESCE]: enqueue called\n"); |
|---|
| 308 | |
|---|
| 309 | sync_context_type = dbpf_sync_get_object_sync_context(qop_p->op.type); |
|---|
| 310 | |
|---|
| 311 | sync_context = & sync_array[sync_context_type][qop_p->op.context_id]; |
|---|
| 312 | |
|---|
| 313 | gen_mutex_lock(&sync_context->mutex); |
|---|
| 314 | |
|---|
| 315 | if( (qop_p->op.flags & TROVE_SYNC) ) |
|---|
| 316 | { |
|---|
| 317 | sync_context->sync_counter++; |
|---|
| 318 | } |
|---|
| 319 | else |
|---|
| 320 | { |
|---|
| 321 | sync_context->non_sync_counter++; |
|---|
| 322 | } |
|---|
| 323 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 324 | "[SYNC_COALESCE]: enqueue %d counter sync:%d non_sync:%d\n", |
|---|
| 325 | sync_context_type, |
|---|
| 326 | sync_context->sync_counter, sync_context->non_sync_counter); |
|---|
| 327 | |
|---|
| 328 | gen_mutex_unlock(&sync_context->mutex); |
|---|
| 329 | |
|---|
| 330 | return 0; |
|---|
| 331 | } |
|---|
| 332 | |
|---|
| 333 | int dbpf_sync_coalesce_dequeue( |
|---|
| 334 | dbpf_queued_op_t *qop_p) |
|---|
| 335 | { |
|---|
| 336 | dbpf_sync_context_t * sync_context; |
|---|
| 337 | int sync_context_type; |
|---|
| 338 | |
|---|
| 339 | if (!DBPF_OP_DOES_SYNC(qop_p->op.type)) |
|---|
| 340 | { |
|---|
| 341 | return 0; |
|---|
| 342 | } |
|---|
| 343 | |
|---|
| 344 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 345 | "[SYNC_COALESCE]: dequeue called\n"); |
|---|
| 346 | |
|---|
| 347 | sync_context_type = dbpf_sync_get_object_sync_context(qop_p->op.type); |
|---|
| 348 | |
|---|
| 349 | sync_context = & sync_array[sync_context_type][qop_p->op.context_id]; |
|---|
| 350 | |
|---|
| 351 | gen_mutex_lock(&sync_context->mutex); |
|---|
| 352 | if( (qop_p->op.flags & TROVE_SYNC) ) |
|---|
| 353 | { |
|---|
| 354 | sync_context->sync_counter--; |
|---|
| 355 | } |
|---|
| 356 | else |
|---|
| 357 | { |
|---|
| 358 | sync_context->non_sync_counter--; |
|---|
| 359 | } |
|---|
| 360 | gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG, |
|---|
| 361 | "[SYNC_COALESCE]: dequeue %d counter sync:%d non_sync:%d\n", |
|---|
| 362 | sync_context_type, |
|---|
| 363 | sync_context->sync_counter, sync_context->non_sync_counter); |
|---|
| 364 | |
|---|
| 365 | gen_mutex_unlock(&sync_context->mutex); |
|---|
| 366 | |
|---|
| 367 | return 0; |
|---|
| 368 | } |
|---|
| 369 | |
|---|
| 370 | void dbpf_queued_op_set_sync_high_watermark( |
|---|
| 371 | int high, struct dbpf_collection* coll) |
|---|
| 372 | { |
|---|
| 373 | coll->c_high_watermark = high; |
|---|
| 374 | } |
|---|
| 375 | |
|---|
| 376 | void dbpf_queued_op_set_sync_low_watermark( |
|---|
| 377 | int low, struct dbpf_collection* coll) |
|---|
| 378 | { |
|---|
| 379 | coll->c_low_watermark = low; |
|---|
| 380 | } |
|---|
| 381 | |
|---|
| 382 | void dbpf_queued_op_set_sync_mode(int enabled, struct dbpf_collection* coll) |
|---|
| 383 | { |
|---|
| 384 | /* |
|---|
| 385 | * Right now we don't have to check if there are operations queued in the |
|---|
| 386 | * coalesync queue, because the mode is set on startup... |
|---|
| 387 | */ |
|---|
| 388 | coll->meta_sync_enabled = enabled; |
|---|
| 389 | } |
|---|
| 390 | |
|---|
| 391 | /* |
|---|
| 392 | * Local variables: |
|---|
| 393 | * c-indent-level: 4 |
|---|
| 394 | * c-basic-offset: 4 |
|---|
| 395 | * End: |
|---|
| 396 | * |
|---|
| 397 | * vim: ts=8 sts=4 sw=4 expandtab |
|---|
| 398 | */ |
|---|