root/branches/Orange-Branch/src/io/trove/trove-dbpf/dbpf-sync.c @ 8600

Revision 8600, 11.8 KB (checked in by bligon, 3 years ago)

In function dbpf_sync_context_destroy(), helgrind was complaining that we shouldn't
try to destory a mutex after it's been locked. So, I added an unlock before the
destroy.

Line 
1/*
2 * (C) 2002 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7#include "dbpf-op-queue.h"
8#include "gossip.h"
9#include "pint-perf-counter.h"
10#include "dbpf-sync.h"
11#include "dbpf-thread.h"
12
13enum s_sync_context_e
14{
15    COALESCE_CONTEXT_KEYVAL = 0,
16    COALESCE_CONTEXT_DSPACE = 1,
17    COALESCE_CONTEXT_LAST = 2
18};
19
20static dbpf_sync_context_t
21    sync_array[COALESCE_CONTEXT_LAST][TROVE_MAX_CONTEXTS];
22
23extern dbpf_op_queue_p dbpf_completion_queue_array[TROVE_MAX_CONTEXTS];
24extern gen_mutex_t dbpf_completion_queue_array_mutex[TROVE_MAX_CONTEXTS];
25extern pthread_cond_t dbpf_op_completed_cond;
26
27static int dbpf_sync_db(
28    DB * dbp,
29    enum s_sync_context_e sync_context_type,
30    dbpf_sync_context_t * sync_context)
31{
32    int ret;
33    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
34                 "[SYNC_COALESCE]:\tcoalesce %d sync start "
35                 "in coalesce_queue:%d pending:%d\n",
36                 sync_context_type, sync_context->coalesce_counter,
37                 sync_context->sync_counter);
38    ret = dbp->sync(dbp, 0);
39    if(ret != 0)
40    {
41        gossip_err("db SYNC failed: %s\n",
42                   db_strerror(ret));
43        ret = -dbpf_db_error_to_trove_error(ret);
44        return ret;
45    }
46    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
47                 "[SYNC_COALESCE]:\tcoalesce %d sync stop\n",
48                 sync_context_type);
49    return 0;
50}
51
52static int dbpf_sync_get_object_sync_context(enum dbpf_op_type type)
53{
54    assert(DBPF_OP_IS_KEYVAL(type) || DBPF_OP_IS_DSPACE(type));
55
56    if(DBPF_OP_IS_KEYVAL(type))
57    {
58        return COALESCE_CONTEXT_KEYVAL;
59    }
60    else
61    {
62        return COALESCE_CONTEXT_DSPACE;
63    }
64}
65
66int dbpf_sync_context_init(int context_index)
67{
68    int c;
69    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
70                 "[SYNC_COALESCE]: dbpf_sync_context_init for "
71                 "context %d called\n",
72                 context_index);
73    for(c=0; c < COALESCE_CONTEXT_LAST; c++)
74    {
75        bzero(& sync_array[c][context_index], sizeof(dbpf_sync_context_t));
76
77        gen_mutex_init(&sync_array[c][context_index].mutex);
78        sync_array[c][context_index].sync_queue = dbpf_op_queue_new();
79    }
80
81    return 0;
82}
83
84void dbpf_sync_context_destroy(int context_index)
85{
86    int c;
87    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
88                 "[SYNC_COALESCE]: dbpf_sync_context_destroy for "
89                 "context %d called\n",
90                 context_index);       
91    for(c=0; c < COALESCE_CONTEXT_LAST; c++)
92    {
93        /* grab lock...should be the last one, since we are shutting down */
94        gen_mutex_lock(&sync_array[c][context_index].mutex);
95     
96        /* we have to unlock the mutex before we can destroy it */
97        gen_mutex_unlock(&sync_array[c][context_index].mutex);
98
99        /* destroy the mutex */
100        gen_mutex_destroy(&sync_array[c][context_index].mutex);
101
102        /* cleanup the op queue */
103        dbpf_op_queue_cleanup(sync_array[c][context_index].sync_queue);
104    }
105}
106
107int dbpf_sync_coalesce(dbpf_queued_op_t *qop_p, int retcode, int * outcount)
108{
109
110    int ret = 0;
111    DB * dbp = NULL;
112    dbpf_sync_context_t * sync_context;
113    dbpf_queued_op_t *ready_op;
114    int sync_context_type;
115    struct dbpf_collection* coll = qop_p->op.coll_p;
116    int cid = qop_p->op.context_id;
117
118    /* We want to set the state in all cases
119     */
120    qop_p->state = retcode;
121
122    if(!DBPF_OP_DOES_SYNC(qop_p->op.type))
123    {
124        dbpf_queued_op_complete(qop_p, OP_COMPLETED);
125        (*outcount)++;
126        return 0;
127    }
128
129    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
130                 "[SYNC_COALESCE]: sync_coalesce called, "
131                 "handle: %llu, cid: %d\n",
132                 llu(qop_p->op.handle), cid);
133
134    sync_context_type = dbpf_sync_get_object_sync_context(qop_p->op.type);
135
136    if ( ! (qop_p->op.flags & TROVE_SYNC) ) {
137        /*
138         * No sync needed at all
139         */
140        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
141                     "[SYNC_COALESCE]: sync not needed, "
142                     "moving to completion queue: %llu\n",
143                     llu(qop_p->op.handle));
144        dbpf_queued_op_complete(qop_p, OP_COMPLETED);
145        return 0;
146    }
147
148    /*
149     * Now we know that this particular op is modifying
150     */
151    sync_context = & sync_array[sync_context_type][cid];
152
153    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
154                 "[SYNC_COALESCE]: sync_context: %p\n", sync_context);
155
156    if( sync_context_type == COALESCE_CONTEXT_DSPACE )
157    {
158        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
159                     "[SYNC_COALESCE]: sync context type is DSPACE\n");
160        dbp = qop_p->op.coll_p->ds_db;
161    }
162    else if( sync_context_type == COALESCE_CONTEXT_KEYVAL )
163    {
164        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
165                     "[SYNC_COALESCE]: sync context type is KEYVAL\n");
166        dbp = qop_p->op.coll_p->keyval_db;
167    }
168
169    if ( ! coll->meta_sync_enabled )
170    {
171        int do_sync=0;
172        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
173                     "[SYNC_COALESCE]: meta sync disabled, "
174                     "moving operation to completion queue\n");
175
176        ret = dbpf_queued_op_complete(qop_p, OP_COMPLETED);
177
178        /*
179         * Sync periodical if count < lw or if lw = 0 and count > hw
180         */
181        gen_mutex_lock(&sync_context->mutex);
182        sync_context->coalesce_counter++;
183        if( (coll->c_high_watermark > 0 &&
184             sync_context->coalesce_counter >= coll->c_high_watermark)
185            || sync_context->sync_counter < coll->c_low_watermark )
186        {
187            gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
188                         "[SYNC_COALESCE]:\thigh or low watermark reached:\n"
189                         "\t\tcoalesced: %d\n\t\tqueued: %d\n",
190                         sync_context->coalesce_counter,
191                         sync_context->sync_counter);
192
193            sync_context->coalesce_counter = 0;
194            do_sync = 1;                               
195        }
196        gen_mutex_unlock(&sync_context->mutex);
197
198        if ( do_sync ) {
199            gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
200                         "[SYNC_COALESCE]: syncing now!\n");
201            ret = dbpf_sync_db(dbp, sync_context_type, sync_context);
202        }
203
204        return ret;
205    }
206
207    /*
208     * metadata sync is enabled, either we delay and enqueue this op or we
209     * coalesce.
210     */
211    gen_mutex_lock(&sync_context->mutex);
212    if( (sync_context->sync_counter < coll->c_low_watermark) ||
213        ( coll->c_high_watermark > 0 &&
214          sync_context->coalesce_counter >= coll->c_high_watermark ) )
215    {
216        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
217                     "[SYNC_COALESCE]:\thigh or low watermark reached:\n"
218                     "\t\tcoalesced: %d\n\t\tqueued: %d\n",
219                     sync_context->coalesce_counter,
220                     sync_context->sync_counter);
221
222        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
223                     "[SYNC_COALESCE]: syncing now!\n");
224        ret = dbpf_sync_db(dbp, sync_context_type, sync_context);
225
226        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
227                     "[SYNC_COALESCE]: moving op: %p, handle: %llu , type: %d "
228                     "to completion queue\n",
229                     qop_p, llu(qop_p->op.handle), qop_p->op.type);
230
231        if(qop_p->event_type == trove_dbpf_dspace_create_event_id)
232        {
233            PINT_EVENT_END(qop_p->event_type, dbpf_pid, NULL, qop_p->event_id,
234                           qop_p->op.u.d_create.out_handle_p);
235        }
236        else
237        {
238            PINT_EVENT_END(qop_p->event_type, dbpf_pid, NULL, qop_p->event_id);
239        }
240
241        DBPF_COMPLETION_START(qop_p, OP_COMPLETED);
242        (*outcount)++;
243
244
245        /* move remaining ops in queue with ready-to-be-synced state
246         * to completion queue
247         */
248        while(!dbpf_op_queue_empty(sync_context->sync_queue))
249        {
250            ready_op = dbpf_op_queue_shownext(sync_context->sync_queue);
251
252            if(ready_op->event_type == trove_dbpf_dspace_create_event_id)
253            {
254                PINT_EVENT_END(ready_op->event_type, dbpf_pid, NULL, ready_op->event_id,
255                               ready_op->op.u.d_create.out_handle_p);
256            }
257            else
258            {
259                PINT_EVENT_END(ready_op->event_type, dbpf_pid, NULL, ready_op->event_id);
260            }
261
262            gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
263                         "[SYNC_COALESCE]: moving op: %p, handle: %llu , type: %d "
264                         "to completion queue\n",
265                         ready_op, llu(ready_op->op.handle), ready_op->op.type);
266
267            dbpf_op_queue_remove(ready_op);
268            DBPF_COMPLETION_ADD(ready_op, OP_COMPLETED);
269            (*outcount)++;
270        }
271
272        sync_context->coalesce_counter = 0;
273        DBPF_COMPLETION_SIGNAL();
274        DBPF_COMPLETION_FINISH(cid);
275        ret = 1;
276    }
277    else
278    {
279        gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
280                     "[SYNC_COALESCE]:\tcoalescing type: %d "
281                     "coalesce_counter: %d, sync_counter: %d, handle %llu\n",
282                     sync_context_type, sync_context->coalesce_counter,
283                     sync_context->sync_counter,
284                     llu(qop_p->op.handle));
285
286        dbpf_op_queue_add(
287            sync_context->sync_queue, qop_p);
288        sync_context->coalesce_counter++;
289        ret = 0;
290    }
291
292    gen_mutex_unlock(&sync_context->mutex);
293    return ret;
294}
295
296int dbpf_sync_coalesce_enqueue(dbpf_queued_op_t *qop_p)
297{
298    dbpf_sync_context_t * sync_context;
299    int sync_context_type;
300
301    if (!DBPF_OP_DOES_SYNC(qop_p->op.type))
302    {
303        return 0;
304    }
305
306    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
307                 "[SYNC_COALESCE]: enqueue called\n");
308
309    sync_context_type = dbpf_sync_get_object_sync_context(qop_p->op.type);
310
311    sync_context = & sync_array[sync_context_type][qop_p->op.context_id];
312
313    gen_mutex_lock(&sync_context->mutex);
314
315    if( (qop_p->op.flags & TROVE_SYNC) )
316    {
317        sync_context->sync_counter++;
318    }
319    else
320    {
321        sync_context->non_sync_counter++;
322    }
323    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
324                 "[SYNC_COALESCE]: enqueue %d counter sync:%d non_sync:%d\n",
325                 sync_context_type,
326                 sync_context->sync_counter, sync_context->non_sync_counter);
327
328    gen_mutex_unlock(&sync_context->mutex);
329
330    return 0;
331}
332
333int dbpf_sync_coalesce_dequeue(
334    dbpf_queued_op_t *qop_p)
335{
336    dbpf_sync_context_t * sync_context;
337    int sync_context_type;
338
339    if (!DBPF_OP_DOES_SYNC(qop_p->op.type))
340    {
341        return 0;
342    }
343
344    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
345                 "[SYNC_COALESCE]: dequeue called\n");
346
347    sync_context_type = dbpf_sync_get_object_sync_context(qop_p->op.type);
348
349    sync_context = & sync_array[sync_context_type][qop_p->op.context_id];
350
351    gen_mutex_lock(&sync_context->mutex);
352    if( (qop_p->op.flags & TROVE_SYNC) )
353    {
354        sync_context->sync_counter--;
355    }
356    else
357    {
358        sync_context->non_sync_counter--;
359    }
360    gossip_debug(GOSSIP_DBPF_COALESCE_DEBUG,
361                 "[SYNC_COALESCE]: dequeue %d counter sync:%d non_sync:%d\n",
362                 sync_context_type,
363                 sync_context->sync_counter, sync_context->non_sync_counter);
364
365    gen_mutex_unlock(&sync_context->mutex);
366
367    return 0;
368}
369
370void dbpf_queued_op_set_sync_high_watermark(
371    int high, struct dbpf_collection* coll)
372{
373    coll->c_high_watermark = high;
374}
375
376void dbpf_queued_op_set_sync_low_watermark(
377    int low, struct dbpf_collection* coll)
378{
379    coll->c_low_watermark = low;
380}
381
382void dbpf_queued_op_set_sync_mode(int enabled, struct dbpf_collection* coll)
383{
384    /*
385     * Right now we don't have to check if there are operations queued in the
386     * coalesync queue, because the mode is set on startup...
387     */
388    coll->meta_sync_enabled = enabled;
389}
390
391/*
392 * Local variables:
393 *  c-indent-level: 4
394 *  c-basic-offset: 4
395 * End:
396 *
397 * vim: ts=8 sts=4 sw=4 expandtab
398 */
Note: See TracBrowser for help on using the browser.