root/trunk/src/io/trove/trove-dbpf/dbpf-sync.c @ 7548

Revision 7548, 11.5 KB (checked in by pcarns, 4 years ago)

some extra gossip debugging message, remove misleading comment

Line 
1/*
2 * (C) 2002 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7#include "dbpf-op-queue.h"
8#include "gossip.h"
9#include "pint-perf-counter.h"
10#include "dbpf-sync.h"
11#include "dbpf-thread.h"
12
13enum s_sync_context_e
14{
15    COALESCE_CONTEXT_KEYVAL = 0,
16    COALESCE_CONTEXT_DSPACE = 1,
17    COALESCE_CONTEXT_LAST = 2
18};
19
20static dbpf_sync_context_t
21    sync_array[COALESCE_CONTEXT_LAST][TROVE_MAX_CONTEXTS];
22
23extern dbpf_op_queue_p dbpf_completion_queue_array[TROVE_MAX_CONTEXTS];
24extern gen_mutex_t dbpf_completion_queue_array_mutex[TROVE_MAX_CONTEXTS];
25extern pthread_cond_t dbpf_op_completed_cond;
26
27static int dbpf_sync_db(
28    DB * dbp,
29    enum s_sync_context_e sync_context_type,
30    dbpf_sync_context_t * sync_context)
31{
32    int ret;
33    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
34                 "[SYNC_COALESCE]:\tcoalesce %d sync start "
35                 "in coalesce_queue:%d pending:%d\n",
36                 sync_context_type, sync_context->coalesce_counter,
37                 sync_context->sync_counter);
38    ret = dbp->sync(dbp, 0);
39    if(ret != 0)
40    {
41        gossip_err("db SYNC failed: %s\n",
42                   db_strerror(ret));
43        ret = -dbpf_db_error_to_trove_error(ret);
44        return ret;
45    }
46    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
47                 "[SYNC_COALESCE]:\tcoalesce %d sync stop\n",
48                 sync_context_type);
49    return 0;
50}
51
52static int dbpf_sync_get_object_sync_context(enum dbpf_op_type type)
53{
54    assert(DBPF_OP_IS_KEYVAL(type) || DBPF_OP_IS_DSPACE(type));
55
56    if(DBPF_OP_IS_KEYVAL(type))
57    {
58        return COALESCE_CONTEXT_KEYVAL;
59    }
60    else
61    {
62        return COALESCE_CONTEXT_DSPACE;
63    }
64}
65
66int dbpf_sync_context_init(int context_index)
67{
68    int c;
69    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
70                 "[SYNC_COALESCE]: dbpf_sync_context_init for "
71                 "context %d called\n",
72                 context_index);
73    for(c=0; c < COALESCE_CONTEXT_LAST; c++)
74    {
75        bzero(& sync_array[c][context_index], sizeof(dbpf_sync_context_t));
76
77        gen_mutex_init(&sync_array[c][context_index].mutex);
78        sync_array[c][context_index].sync_queue = dbpf_op_queue_new();
79    }
80
81    return 0;
82}
83
84void dbpf_sync_context_destroy(int context_index)
85{
86    int c;
87    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
88                 "[SYNC_COALESCE]: dbpf_sync_context_destroy for "
89                 "context %d called\n",
90                 context_index);       
91    for(c=0; c < COALESCE_CONTEXT_LAST; c++)
92    {
93        gen_mutex_lock(&sync_array[c][context_index].mutex);
94        gen_mutex_destroy(&sync_array[c][context_index].mutex);
95        dbpf_op_queue_cleanup(sync_array[c][context_index].sync_queue);
96    }
97}
98
99int dbpf_sync_coalesce(dbpf_queued_op_t *qop_p, int retcode, int * outcount)
100{
101
102    int ret = 0;
103    DB * dbp = NULL;
104    dbpf_sync_context_t * sync_context;
105    dbpf_queued_op_t *ready_op;
106    int sync_context_type;
107    struct dbpf_collection* coll = qop_p->op.coll_p;
108    int cid = qop_p->op.context_id;
109
110    /* We want to set the state in all cases
111     */
112    qop_p->state = retcode;
113
114    if(!DBPF_OP_DOES_SYNC(qop_p->op.type))
115    {
116        dbpf_queued_op_complete(qop_p, OP_COMPLETED);
117        (*outcount)++;
118        return 0;
119    }
120
121    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
122                 "[SYNC_COALESCE]: sync_coalesce called, "
123                 "handle: %llu, cid: %d\n",
124                 llu(qop_p->op.handle), cid);
125
126    sync_context_type = dbpf_sync_get_object_sync_context(qop_p->op.type);
127
128    if ( ! (qop_p->op.flags & TROVE_SYNC) ) {
129        /*
130         * No sync needed at all
131         */
132        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
133                     "[SYNC_COALESCE]: sync not needed, "
134                     "moving to completion queue: %llu\n",
135                     llu(qop_p->op.handle));
136        dbpf_queued_op_complete(qop_p, OP_COMPLETED);
137        return 0;
138    }
139
140    /*
141     * Now we know that this particular op is modifying
142     */
143    sync_context = & sync_array[sync_context_type][cid];
144
145    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
146                 "[SYNC_COALESCE]: sync_context: %p\n", sync_context);
147
148    if( sync_context_type == COALESCE_CONTEXT_DSPACE )
149    {
150        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
151                     "[SYNC_COALESCE]: sync context type is DSPACE\n");
152        dbp = qop_p->op.coll_p->ds_db;
153    }
154    else if( sync_context_type == COALESCE_CONTEXT_KEYVAL )
155    {
156        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
157                     "[SYNC_COALESCE]: sync context type is KEYVAL\n");
158        dbp = qop_p->op.coll_p->keyval_db;
159    }
160
161    if ( ! coll->meta_sync_enabled )
162    {
163        int do_sync=0;
164        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
165                     "[SYNC_COALESCE]: meta sync disabled, "
166                     "moving operation to completion queue\n");
167
168        ret = dbpf_queued_op_complete(qop_p, OP_COMPLETED);
169
170        /*
171         * Sync periodical if count < lw or if lw = 0 and count > hw
172         */
173        gen_mutex_lock(&sync_context->mutex);
174        sync_context->coalesce_counter++;
175        if( (coll->c_high_watermark > 0 &&
176             sync_context->coalesce_counter >= coll->c_high_watermark)
177            || sync_context->sync_counter < coll->c_low_watermark )
178        {
179            gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
180                         "[SYNC_COALESCE]:\thigh or low watermark reached:\n"
181                         "\t\tcoalesced: %d\n\t\tqueued: %d\n",
182                         sync_context->coalesce_counter,
183                         sync_context->sync_counter);
184
185            sync_context->coalesce_counter = 0;
186            do_sync = 1;                               
187        }
188        gen_mutex_unlock(&sync_context->mutex);
189
190        if ( do_sync ) {
191            gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
192                         "[SYNC_COALESCE]: syncing now!\n");
193            ret = dbpf_sync_db(dbp, sync_context_type, sync_context);
194        }
195
196        return ret;
197    }
198
199    /*
200     * metadata sync is enabled, either we delay and enqueue this op or we
201     * coalesce.
202     */
203    gen_mutex_lock(&sync_context->mutex);
204    if( (sync_context->sync_counter < coll->c_low_watermark) ||
205        ( coll->c_high_watermark > 0 &&
206          sync_context->coalesce_counter >= coll->c_high_watermark ) )
207    {
208        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
209                     "[SYNC_COALESCE]:\thigh or low watermark reached:\n"
210                     "\t\tcoalesced: %d\n\t\tqueued: %d\n",
211                     sync_context->coalesce_counter,
212                     sync_context->sync_counter);
213
214        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
215                     "[SYNC_COALESCE]: syncing now!\n");
216        ret = dbpf_sync_db(dbp, sync_context_type, sync_context);
217
218        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
219                     "[SYNC_COALESCE]: moving op: %p, handle: %llu , type: %d "
220                     "to completion queue\n",
221                     qop_p, llu(qop_p->op.handle), qop_p->op.type);
222
223        if(qop_p->event_type == trove_dbpf_dspace_create_event_id)
224        {
225            PINT_EVENT_END(qop_p->event_type, dbpf_pid, NULL, qop_p->event_id,
226                           qop_p->op.u.d_create.out_handle_p);
227        }
228        else
229        {
230            PINT_EVENT_END(qop_p->event_type, dbpf_pid, NULL, qop_p->event_id);
231        }
232
233        DBPF_COMPLETION_START(qop_p, OP_COMPLETED);
234        (*outcount)++;
235
236
237        /* move remaining ops in queue with ready-to-be-synced state
238         * to completion queue
239         */
240        while(!dbpf_op_queue_empty(sync_context->sync_queue))
241        {
242            ready_op = dbpf_op_queue_shownext(sync_context->sync_queue);
243
244            if(ready_op->event_type == trove_dbpf_dspace_create_event_id)
245            {
246                PINT_EVENT_END(ready_op->event_type, dbpf_pid, NULL, ready_op->event_id,
247                               ready_op->op.u.d_create.out_handle_p);
248            }
249            else
250            {
251                PINT_EVENT_END(ready_op->event_type, dbpf_pid, NULL, ready_op->event_id);
252            }
253
254            gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
255                         "[SYNC_COALESCE]: moving op: %p, handle: %llu , type: %d "
256                         "to completion queue\n",
257                         ready_op, llu(ready_op->op.handle), ready_op->op.type);
258
259            dbpf_op_queue_remove(ready_op);
260            DBPF_COMPLETION_ADD(ready_op, OP_COMPLETED);
261            (*outcount)++;
262        }
263
264        sync_context->coalesce_counter = 0;
265        DBPF_COMPLETION_SIGNAL();
266        DBPF_COMPLETION_FINISH(cid);
267        ret = 1;
268    }
269    else
270    {
271        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
272                     "[SYNC_COALESCE]:\tcoalescing type: %d "
273                     "coalesce_counter: %d, sync_counter: %d, handle %llu\n",
274                     sync_context_type, sync_context->coalesce_counter,
275                     sync_context->sync_counter,
276                     llu(qop_p->op.handle));
277
278        dbpf_op_queue_add(
279            sync_context->sync_queue, qop_p);
280        sync_context->coalesce_counter++;
281        ret = 0;
282    }
283
284    gen_mutex_unlock(&sync_context->mutex);
285    return ret;
286}
287
288int dbpf_sync_coalesce_enqueue(dbpf_queued_op_t *qop_p)
289{
290    dbpf_sync_context_t * sync_context;
291    int sync_context_type;
292
293    if (!DBPF_OP_DOES_SYNC(qop_p->op.type))
294    {
295        return 0;
296    }
297
298    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
299                 "[SYNC_COALESCE]: enqueue called\n");
300
301    sync_context_type = dbpf_sync_get_object_sync_context(qop_p->op.type);
302
303    sync_context = & sync_array[sync_context_type][qop_p->op.context_id];
304
305    gen_mutex_lock(&sync_context->mutex);
306
307    if( (qop_p->op.flags & TROVE_SYNC) )
308    {
309        sync_context->sync_counter++;
310    }
311    else
312    {
313        sync_context->non_sync_counter++;
314    }
315    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
316                 "[SYNC_COALESCE]: enqueue %d counter sync:%d non_sync:%d\n",
317                 sync_context_type,
318                 sync_context->sync_counter, sync_context->non_sync_counter);
319
320    gen_mutex_unlock(&sync_context->mutex);
321
322    return 0;
323}
324
325int dbpf_sync_coalesce_dequeue(
326    dbpf_queued_op_t *qop_p)
327{
328    dbpf_sync_context_t * sync_context;
329    int sync_context_type;
330
331    if (!DBPF_OP_DOES_SYNC(qop_p->op.type))
332    {
333        return 0;
334    }
335
336    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
337                 "[SYNC_COALESCE]: dequeue called\n");
338
339    sync_context_type = dbpf_sync_get_object_sync_context(qop_p->op.type);
340
341    sync_context = & sync_array[sync_context_type][qop_p->op.context_id];
342
343    gen_mutex_lock(&sync_context->mutex);
344    if( (qop_p->op.flags & TROVE_SYNC) )
345    {
346        sync_context->sync_counter--;
347    }
348    else
349    {
350        sync_context->non_sync_counter--;
351    }
352    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
353                 "[SYNC_COALESCE]: dequeue %d counter sync:%d non_sync:%d\n",
354                 sync_context_type,
355                 sync_context->sync_counter, sync_context->non_sync_counter);
356
357    gen_mutex_unlock(&sync_context->mutex);
358
359    return 0;
360}
361
362void dbpf_queued_op_set_sync_high_watermark(
363    int high, struct dbpf_collection* coll)
364{
365    coll->c_high_watermark = high;
366}
367
368void dbpf_queued_op_set_sync_low_watermark(
369    int low, struct dbpf_collection* coll)
370{
371    coll->c_low_watermark = low;
372}
373
374void dbpf_queued_op_set_sync_mode(int enabled, struct dbpf_collection* coll)
375{
376    /*
377     * Right now we don't have to check if there are operations queued in the
378     * coalesync queue, because the mode is set on startup...
379     */
380    coll->meta_sync_enabled = enabled;
381}
382
383/*
384 * Local variables:
385 *  c-indent-level: 4
386 *  c-basic-offset: 4
387 * End:
388 *
389 * vim: ts=8 sts=4 sw=4 expandtab
390 */
Note: See TracBrowser for help on using the browser.