root/branches/pvfs-2-8-branch/src/io/job/job.c @ 8320

Revision 8320, 179.5 KB (checked in by pcarns, 3 years ago)

merge fix for precreate bug reported by Bart Taylor from trunk to 2-8 branch

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/* this file contains a skeleton implementation of the job interface */
8
9#include <errno.h>
10#include <sys/time.h>
11#include <unistd.h>
12#include <stdio.h>
13#include <limits.h>
14#include <string.h>
15#include <assert.h>
16#include <time.h>
17
18#include "state-machine.h"
19#include "job.h"
20#include "job-desc-queue.h"
21#include "gen-locks.h"
22#include "bmi.h"
23#include "trove.h"
24#include "gossip.h"
25#include "id-generator.h"
26#include "job-time-mgr.h"
27#include "pvfs2-internal.h"
28
29/* contexts for use within the job interface */
30static bmi_context_id global_bmi_context = -1;
31#ifdef __PVFS2_TROVE_SUPPORT__
32static TROVE_context_id global_trove_context = -1;
33#endif
34
35/* queues of pending jobs */
36static job_desc_q_p completion_queue_array[JOB_MAX_CONTEXTS] = {NULL};
37static int completion_error = 0;
38static job_desc_q_p bmi_unexp_queue = NULL;
39static int bmi_unexp_pending_count = 0;
40static int bmi_pending_count = 0;
41static int trove_pending_count = 0;
42static int flow_pending_count = 0;
43static job_desc_q_p dev_unexp_queue = NULL;
44static int dev_unexp_pending_count = 0;
45/* locks for internal queues */
46static gen_mutex_t bmi_unexp_mutex = GEN_MUTEX_INITIALIZER;
47static gen_mutex_t dev_unexp_mutex = GEN_MUTEX_INITIALIZER;
48static gen_mutex_t completion_mutex = GEN_MUTEX_INITIALIZER;
49
50static int initialized = 0;
51static gen_mutex_t initialized_mutex = GEN_MUTEX_INITIALIZER;
52
53#ifdef __PVFS2_JOB_THREADED__
54static pthread_cond_t completion_cond = PTHREAD_COND_INITIALIZER;
55#endif /* __PVFS2_JOB_THREADED__ */
56
57/* number of jobs to test for at once inside of do_one_work_cycle() */
58enum
59{
60    job_work_metric = 5,
61    thread_wait_timeout = 10000        /* usecs */
62};
63
64/* cap how many keys we dump into trove at once when filling precreate pools
65 * so that it doesn't clog up trove queues
66 */
67#define PRECREATE_POOL_MAX_KEYS 32
68
69#ifdef __PVFS2_TROVE_SUPPORT__
70
71static gen_mutex_t precreate_pool_mutex = GEN_MUTEX_INITIALIZER;
72static QLIST_HEAD(precreate_pool_check_level_list);
73static QLIST_HEAD(precreate_pool_get_handles_list);
74static QLIST_HEAD(precreate_pool_fs_list);
75
76struct precreate_pool
77{
78    struct qlist_head list_link;
79    char* host;
80    PVFS_handle pool_handle;
81    uint32_t pool_count;
82};
83
84struct fs_pool
85{
86    struct qlist_head list_link;
87    PVFS_fs_id fsid;
88    struct qlist_head precreate_pool_list;
89    struct qlist_head* precreate_pool_initial;
90};
91
92struct precreate_pool_get_trove
93{
94    struct job_desc* jd; /* parent job descriptor */
95    /* variables needed per keyval_iterate_keys() call */
96    PVFS_ds_position pos;
97    PVFS_ds_keyval key;
98    int count;
99    struct PINT_thread_mgr_trove_callback trove_callback;
100    struct precreate_pool* pool;
101};
102#endif /* __PVFS2_TROVE_SUPPORT__ */
103
104/********************************************************
105 * function prototypes
106 */
107
108static int setup_queues(void);
109static void teardown_queues(void);
110static int do_one_test_cycle_req_sched(void);
111static void fill_status(struct job_desc *jd,
112                        void **returned_user_ptr_p,
113                        job_status_s * status);
114static int completion_query_some(job_id_t * id_array,
115                                 int *inout_count_p,
116                                 int *out_index_array,
117                                 void **returned_user_ptr_array,
118                                 job_status_s * out_status_array_p);
119static int completion_query_context(job_id_t * out_id_array_p,
120                                  int *inout_count_p,
121                                  void **returned_user_ptr_array,
122                                  job_status_s *
123                                  out_status_array_p,
124                                  job_context_id context_id);
125static void bmi_thread_mgr_callback(void* data,
126    PVFS_size actual_size,
127    PVFS_error error_code);
128static void bmi_thread_mgr_unexp_handler(struct BMI_unexpected_info* unexp);
129static void dev_thread_mgr_unexp_handler(struct PINT_dev_unexp_info* unexp);
130static void trove_thread_mgr_callback(void* data,
131    PVFS_error error_code);
132static void flow_callback(flow_descriptor* flow_d, int cancel_path);
133#ifndef __PVFS2_JOB_THREADED__
134static gen_mutex_t work_cycle_mutex = GEN_MUTEX_INITIALIZER;
135static void do_one_work_cycle_all(int idle_time_ms);
136#endif
137#ifdef __PVFS2_TROVE_SUPPORT__
138static void precreate_pool_get_thread_mgr_callback(
139    void* data,
140    PVFS_error error_code);
141static void precreate_pool_get_thread_mgr_callback_unlocked(
142    void* data,
143    PVFS_error error_code);
144static void precreate_pool_fill_thread_mgr_callback(
145    void* data,
146    PVFS_error error_code);
147static void precreate_pool_iterate_callback(
148    void* data,
149    PVFS_error error_code);
150static void precreate_pool_get_handles_try_post(struct job_desc* jd);
151static struct fs_pool* find_fs(PVFS_fs_id fsid);
152#endif
153
154/********************************************************
155 * public interface
156 */
157
158/* job_initialize()
159 *
160 * start up the job interface
161 *
162 * returns 0 on success, -errno on failure
163 */
164int job_initialize(int flags)
165{
166    int ret = -1;
167
168    ret = setup_queues();
169    if (ret < 0)
170    {
171        return (ret);
172    }
173
174    /* startup threads */
175    ret = PINT_thread_mgr_bmi_start();
176    if (ret != 0)
177    {
178        teardown_queues();
179        return (-ret);
180    }
181    ret = PINT_thread_mgr_bmi_getcontext((PVFS_context_id *)&global_bmi_context);
182    /* this should never fail if the thread startup succeeded */
183    assert(ret == 0);
184
185#ifdef __PVFS2_CLIENT__
186    ret = PINT_thread_mgr_dev_start();
187    if (ret != 0)
188    {
189        PINT_thread_mgr_bmi_stop();
190        teardown_queues();
191        return(-ret);
192    }
193#endif
194
195#ifdef __PVFS2_TROVE_SUPPORT__
196    ret = PINT_thread_mgr_trove_start();
197    if(ret != 0)
198    {
199        PINT_thread_mgr_bmi_stop();
200#ifdef __PVFS2_CLIENT__
201        PINT_thread_mgr_dev_stop();
202#endif
203        teardown_queues();
204        return (-ret);
205    }
206    ret = PINT_thread_mgr_trove_getcontext(&global_trove_context);
207    /* this should never fail if thread startup succeeded */
208    assert(ret == 0);
209#endif
210
211    id_gen_safe_initialize();
212
213    gen_mutex_lock(&initialized_mutex);
214    initialized = 1;
215    gen_mutex_unlock(&initialized_mutex);
216
217    return (0);
218}
219
220/* job_finalize()
221 *
222 * shuts down the job interface
223 *
224 * returns 0 on success, -errno on failure
225 */
226int job_finalize(void)
227{
228    gen_mutex_lock(&initialized_mutex);
229    initialized = 0;
230    gen_mutex_unlock(&initialized_mutex);
231
232    id_gen_safe_finalize();
233
234    PINT_thread_mgr_bmi_stop();
235#ifdef __PVFS2_CLIENT__
236    PINT_thread_mgr_dev_stop();
237#endif
238#ifdef __PVFS2_TROVE_SUPPORT__
239    PINT_thread_mgr_trove_stop();
240#endif
241    teardown_queues();
242    return 0;
243}
244
245
246/* job_open_context()
247 *
248 * opens a new context for the job interface
249 *
250 * returns 0 on success, -errno on failure
251 */
252int job_open_context(job_context_id* context_id)
253{
254    int context_index;
255
256    /* find an unused context id */
257    gen_mutex_lock(&completion_mutex);
258    for(context_index=0; context_index<JOB_MAX_CONTEXTS; context_index++)
259    {
260        if(completion_queue_array[context_index] == NULL)
261        {
262            break;
263        }
264    }
265
266    if(context_index >= JOB_MAX_CONTEXTS)
267    {
268        /* we don't have any more available! */
269        gen_mutex_unlock(&completion_mutex);
270        return(-EBUSY);
271    }
272
273    /* create a new completion queue for the context */
274    completion_queue_array[context_index] = job_desc_q_new();
275    if(!completion_queue_array[context_index])
276    {
277        gen_mutex_unlock(&completion_mutex);
278        return(-ENOMEM);
279    }
280    gen_mutex_unlock(&completion_mutex);
281
282    *context_id = context_index;
283    return(0);
284}
285
286
287/* job_close_context()
288 *
289 * destroys a context previously created with job_open_context()
290 *
291 * no return value
292 */
293void job_close_context(job_context_id context_id)
294{
295    gen_mutex_lock(&completion_mutex);
296    if(!completion_queue_array[context_id])
297    {
298        gen_mutex_unlock(&completion_mutex);
299        return;
300    }
301
302    job_desc_q_cleanup(completion_queue_array[context_id]);
303
304    completion_queue_array[context_id] = NULL;
305
306    gen_mutex_unlock(&completion_mutex);
307    return;
308}
309
310/* job_reset_timeout()
311 *
312 * resets the timeout associated with a job that has already been posted but
313 * has not yet completed
314 *
315 * returns 0 on success, -PVFS_errno on failure
316 */
317int job_reset_timeout(job_id_t id, int timeout_sec)
318{
319    struct job_desc* query = NULL;
320    int ret = -1;
321
322    /* lock completion queue to make sure that a concurrent test call
323     * doesn't pull the job out from under us somehow
324     */
325    gen_mutex_lock(&completion_mutex);
326
327    query = id_gen_safe_lookup(id);
328    if(!query)
329    {       
330        /* this id is not valid */
331        gen_mutex_unlock(&completion_mutex);
332        return(-PVFS_EINVAL);
333    }
334
335    if(query->type != JOB_BMI && query->type != JOB_FLOW)
336    {
337        /* trying to reset timeouts on a job that doesn't support the
338         * concept
339         */
340        gen_mutex_unlock(&completion_mutex);
341        return(-PVFS_EINVAL);
342    }
343
344    /* pull the job out of the time mgr (thereby clearing old timer) */
345    job_time_mgr_rem(query);
346
347    /* put it back into the time mgr with new value */
348    ret = job_time_mgr_add(query, timeout_sec);
349
350    gen_mutex_unlock(&completion_mutex);
351
352    return(ret);
353}
354
355
356/* job_bmi_send()
357 *
358 * posts a job to send a BMI message
359 *
360 * returns 0 on success, 1 on immediate completion, and -errno on
361 * failure
362 */
363int job_bmi_send(PVFS_BMI_addr_t addr,
364                 void *buffer,
365                 bmi_size_t size,
366                 bmi_msg_tag_t tag,
367                 enum bmi_buffer_type buffer_type,
368                 int send_unexpected,
369                 void *user_ptr,
370                 job_aint status_user_tag,
371                 job_status_s * out_status_p,
372                 job_id_t * id,
373                 job_context_id context_id,
374                 int timeout_sec,
375                 PVFS_hint hints)
376{
377    /* post a bmi send.  If it completes (or fails) immediately, then
378     * return and fill in the status structure.  If it needs to be tested
379     * for completion later, then queue up a job_desc structure.
380     */
381
382    int ret = -1;
383    struct job_desc *jd = NULL;
384    void* user_ptr_internal = NULL;
385
386    /* create the job desc first, even though we may not use it.  This
387     * gives us somewhere to store the BMI id and user ptr
388     */
389    jd = alloc_job_desc(JOB_BMI);
390    if (!jd)
391    {
392        out_status_p->error_code = -PVFS_ERROR_CODE(errno);
393        return 1;
394    }
395    jd->job_user_ptr = user_ptr;
396    jd->u.bmi.actual_size = size;
397    jd->context_id = context_id;
398    jd->status_user_tag = status_user_tag;
399    jd->bmi_callback.fn = bmi_thread_mgr_callback;
400    jd->bmi_callback.data = (void*)jd;
401    user_ptr_internal = &jd->bmi_callback;
402
403    jd->hints = hints;
404
405    /* post appropriate type of send */
406    if (!send_unexpected)
407    {
408        ret = BMI_post_send(&(jd->u.bmi.id), addr, buffer, size,
409                            buffer_type, tag, user_ptr_internal,
410                            global_bmi_context, jd->hints);
411    }
412    else
413    {
414        ret = BMI_post_sendunexpected(&(jd->u.bmi.id), addr,
415                                      buffer, size, buffer_type, tag,
416                                      user_ptr_internal, global_bmi_context,
417                                      jd->hints);
418    }
419
420    if (ret < 0)
421    {
422        /* error posting */
423        out_status_p->error_code = ret;
424        out_status_p->status_user_tag = status_user_tag;
425        dealloc_job_desc(jd);
426        jd = NULL;
427        return (1);
428    }
429
430    if (ret == 1)
431    {
432        /* immediate completion */
433        out_status_p->error_code = 0;
434        out_status_p->status_user_tag = status_user_tag;
435        out_status_p->actual_size = size;
436        dealloc_job_desc(jd);
437        jd = NULL;
438        return (ret);
439    }
440
441    /* if we fall to this point, the job did not immediately complete and
442     * we must queue up to test it later
443     */
444    *id = jd->job_id;
445    bmi_pending_count++;
446
447    return(job_time_mgr_add(jd, timeout_sec));
448}
449
450
451/* job_bmi_send_list()
452 *
453 * posts a job to send a BMI list message
454 *
455 * returns 0 on success, 1 on immediate completion, and -errno on
456 * failure
457 */
458int job_bmi_send_list(PVFS_BMI_addr_t addr,
459                      void **buffer_list,
460                      bmi_size_t * size_list,
461                      int list_count,
462                      bmi_size_t total_size,
463                      bmi_msg_tag_t tag,
464                      enum bmi_buffer_type buffer_type,
465                      int send_unexpected,
466                      void *user_ptr,
467                      job_aint status_user_tag,
468                      job_status_s * out_status_p,
469                      job_id_t * id,
470                      job_context_id context_id,
471                      int timeout_sec,
472                      PVFS_hint hints)
473{
474    /* post a bmi send.  If it completes (or fails) immediately, then
475     * return and fill in the status structure.  If it needs to be tested
476     * for completion later, then queue up a job_desc structure.
477     */
478
479    int ret = -1;
480    struct job_desc *jd = NULL;
481    void* user_ptr_internal = NULL;
482
483    /* create the job desc first, even though we may not use it.  This
484     * gives us somewhere to store the BMI id and user ptr
485     */
486    jd = alloc_job_desc(JOB_BMI);
487    if (!jd)
488    {
489        out_status_p->error_code = -PVFS_ERROR_CODE(errno);
490        return 1;
491    }
492    jd->job_user_ptr = user_ptr;
493    jd->u.bmi.actual_size = total_size;
494    jd->context_id = context_id;
495    jd->status_user_tag = status_user_tag;
496    jd->bmi_callback.fn = bmi_thread_mgr_callback;
497    jd->bmi_callback.data = (void*)jd;
498    user_ptr_internal = &jd->bmi_callback;
499
500    jd->hints = hints;
501
502    /* post appropriate type of send */
503    if (!send_unexpected)
504    {
505        ret = BMI_post_send_list(&(jd->u.bmi.id), addr,
506                                 (const void **) buffer_list, size_list,
507                                 list_count, total_size, buffer_type,
508                                 tag, user_ptr_internal, global_bmi_context, hints);
509    }
510    else
511    {
512        ret = BMI_post_sendunexpected_list(&(jd->u.bmi.id), addr,
513                                           (const void **) buffer_list,
514                                           size_list, list_count,
515                                           total_size, buffer_type, tag,
516                                           user_ptr_internal, global_bmi_context, hints);
517    }
518
519    if (ret < 0)
520    {
521        /* error posting */
522        out_status_p->error_code = ret;
523        out_status_p->status_user_tag = status_user_tag;
524        dealloc_job_desc(jd);
525        jd = NULL;
526        return (1);
527    }
528
529    if (ret == 1)
530    {
531        /* immediate completion */
532        out_status_p->error_code = 0;
533        out_status_p->status_user_tag = status_user_tag;
534        out_status_p->actual_size = total_size;
535        dealloc_job_desc(jd);
536        jd = NULL;
537        return (ret);
538    }
539
540    /* if we fall to this point, the job did not immediately complete and
541     * we must queue up to test it later
542     */
543    *id = jd->job_id;
544    bmi_pending_count++;
545    return(job_time_mgr_add(jd, timeout_sec));
546}
547
548/* job_bmi_recv()
549 *
550 * posts a job to receive a BMI message
551 *
552 * returns 0 on success, 1 on immediate completion, and -errno on
553 * failure
554 */
555int job_bmi_recv(PVFS_BMI_addr_t addr,
556                 void *buffer,
557                 bmi_size_t size,
558                 bmi_msg_tag_t tag,
559                 enum bmi_buffer_type buffer_type,
560                 void *user_ptr,
561                 job_aint status_user_tag,
562                 job_status_s * out_status_p,
563                 job_id_t * id,
564                 job_context_id context_id,
565                 int timeout_sec,
566                 PVFS_hint hints)
567{
568    /* post a bmi recv.  If it completes (or fails) immediately, then
569     * return and fill in the status structure.  If it needs to be tested
570     * for completion later, then queue up a job_desc structure.
571     */
572    int ret = -1;
573    struct job_desc *jd = NULL;
574    void* user_ptr_internal = NULL;
575
576    /* create the job desc first, even though we may not use it.  This
577     * gives us somewhere to store the BMI id and user ptr
578     */
579    jd = alloc_job_desc(JOB_BMI);
580    if (!jd)
581    {
582        out_status_p->error_code = -PVFS_ENOMEM;
583        return 1;
584    }
585    jd->hints = hints;
586    jd->job_user_ptr = user_ptr;
587    jd->context_id = context_id;
588    jd->status_user_tag = status_user_tag;
589    jd->bmi_callback.fn = bmi_thread_mgr_callback;
590    jd->bmi_callback.data = (void*)jd;
591    user_ptr_internal = &jd->bmi_callback;
592
593
594    ret = BMI_post_recv(&(jd->u.bmi.id), addr, buffer, size,
595                        &(jd->u.bmi.actual_size), buffer_type, tag,
596                        user_ptr_internal,
597                        global_bmi_context,
598                        hints);
599    if (ret < 0)
600    {
601        /* error posting */
602        out_status_p->error_code = ret;
603        out_status_p->status_user_tag = status_user_tag;
604        dealloc_job_desc(jd);
605        jd = NULL;
606        return (1);
607    }
608
609    if (ret == 1)
610    {
611        /* immediate completion */
612        out_status_p->error_code = 0;
613        out_status_p->status_user_tag = status_user_tag;
614        out_status_p->actual_size = jd->u.bmi.actual_size;
615        dealloc_job_desc(jd);
616        jd = NULL;
617        return (ret);
618    }
619
620    /* if we fall to this point, the job did not immediately complete and
621     * we must queue up to test it later
622     */
623    *id = jd->job_id;
624    bmi_pending_count++;
625
626    return(job_time_mgr_add(jd, timeout_sec));
627}
628
629
630/* job_bmi_recv_list()
631 *
632 * posts a job to receive a BMI list message
633 *
634 * returns 0 on success, 1 on immediate completion, and -errno on
635 * failure
636 */
637int job_bmi_recv_list(PVFS_BMI_addr_t addr,
638                      void **buffer_list,
639                      bmi_size_t * size_list,
640                      int list_count,
641                      bmi_size_t total_expected_size,
642                      bmi_msg_tag_t tag,
643                      enum bmi_buffer_type buffer_type,
644                      void *user_ptr,
645                      job_aint status_user_tag,
646                      job_status_s * out_status_p,
647                      job_id_t * id,
648                      job_context_id context_id,
649                      int timeout_sec,
650                      PVFS_hint hints)
651{
652
653    /* post a bmi recv.  If it completes (or fails) immediately, then
654     * return and fill in the status structure.  If it needs to be tested
655     * for completion later, then queue up a job_desc structure.
656     */
657
658    int ret = -1;
659    struct job_desc *jd = NULL;
660    void* user_ptr_internal = NULL;
661
662    /* create the job desc first, even though we may not use it.  This
663     * gives us somewhere to store the BMI id and user ptr
664     */
665    jd = alloc_job_desc(JOB_BMI);
666    if (!jd)
667    {
668        out_status_p->error_code = -PVFS_ENOMEM;
669        return 1;
670    }
671    jd->hints = hints;
672    jd->job_user_ptr = user_ptr;
673    jd->context_id = context_id;
674    jd->status_user_tag = status_user_tag;
675    jd->bmi_callback.fn = bmi_thread_mgr_callback;
676    jd->bmi_callback.data = (void*)jd;
677    user_ptr_internal = &jd->bmi_callback;
678
679    ret = BMI_post_recv_list(&(jd->u.bmi.id), addr, buffer_list,
680                             size_list, list_count, total_expected_size,
681                             &(jd->u.bmi.actual_size), buffer_type, tag,
682                             user_ptr_internal, global_bmi_context, hints);
683
684    if (ret < 0)
685    {
686        /* error posting */
687        out_status_p->error_code = ret;
688        out_status_p->status_user_tag = status_user_tag;
689        dealloc_job_desc(jd);
690        jd = NULL;
691        return (1);
692    }
693
694    if (ret == 1)
695    {
696        /* immediate completion */
697        out_status_p->error_code = 0;
698        out_status_p->status_user_tag = status_user_tag;
699        out_status_p->actual_size = jd->u.bmi.actual_size;
700        dealloc_job_desc(jd);
701        jd = NULL;
702        return (ret);
703    }
704
705    /* if we fall to this point, the job did not immediately complete and
706     * we must queue up to test it later
707     */
708    *id = jd->job_id;
709    bmi_pending_count++;
710
711    return(job_time_mgr_add(jd, timeout_sec));
712}
713
714/* job_bmi_unexp()
715 *
716 * posts a job to receive an unexpected BMI message
717 *
718 * returns 0 on succcess, 1 on immediate completion, and -errno on
719 * failure
720 */
721int job_bmi_unexp(struct BMI_unexpected_info *bmi_unexp_d,
722                  void *user_ptr,
723                  job_aint status_user_tag,
724                  job_status_s * out_status_p,
725                  job_id_t * id,
726                  enum job_flags flags,
727                  job_context_id context_id)
728{
729    /* post a bmi recv for an unexpected message.  We will do a quick
730     * test to see if an unexpected message is available.  If so, we
731     * return the necessary info; if not we queue up to test again later
732     */
733
734    int ret = -1;
735    struct job_desc *jd = NULL;
736    int outcount = 0;
737
738    /* create the job desc first, even though we may not use it.  This
739     * gives us somewhere to store the BMI id and user ptr
740     */
741    jd = alloc_job_desc(JOB_BMI_UNEXP);
742    if (!jd)
743    {
744        out_status_p->error_code = -PVFS_ENOMEM;
745        return 1;
746    }
747    jd->job_user_ptr = user_ptr;
748    jd->u.bmi_unexp.info = bmi_unexp_d;
749    jd->context_id = context_id;
750    jd->status_user_tag = status_user_tag;
751
752   /*********************************************************
753         * TODO: consider optimizations later, so that we avoid
754         * disabling immediate completion.  See the mailing list thread
755         * started here:
756         * http://www.beowulf-underground.org/pipermail/pvfs2-internal/2003-February/000305.html
757         *
758         * At the moment, the server is not designed to be able to
759         * handle immediate completion upon posting unexpected jobs.
760         */
761
762    /* only look for immediate completion if our flags allow it */
763    if (!(flags & JOB_NO_IMMED_COMPLETE))
764    {
765        ret = BMI_testunexpected(1, &outcount, jd->u.bmi_unexp.info, 0);
766
767        if (ret < 0)
768        {
769            /* error testing */
770            dealloc_job_desc(jd);
771            jd = NULL;
772            out_status_p->error_code = ret;
773            return 1;
774        }
775
776        if (outcount == 1)
777        {
778            /* there was an unexpected job available */
779            out_status_p->error_code = jd->u.bmi_unexp.info->error_code;
780            out_status_p->status_user_tag = status_user_tag;
781            dealloc_job_desc(jd);
782            jd = NULL;
783            return (ret);
784        }
785    }
786
787    /* if we fall through to this point, then there were not any
788     * uenxpected receive's available; queue up to test later
789     */
790    gen_mutex_lock(&bmi_unexp_mutex);
791    *id = jd->job_id;
792    job_desc_q_add(bmi_unexp_queue, jd);
793    bmi_unexp_pending_count++;
794    gen_mutex_unlock(&bmi_unexp_mutex);
795
796    PINT_thread_mgr_bmi_unexp_handler(bmi_thread_mgr_unexp_handler);
797
798    return (0);
799}
800
801int job_bmi_unexp_cancel(job_id_t id)
802{
803    struct job_desc *jd;
804
805    gen_mutex_lock(&bmi_unexp_mutex);
806    jd = id_gen_safe_lookup(id);
807    job_desc_q_remove(jd);
808    bmi_unexp_pending_count--;
809    gen_mutex_unlock(&bmi_unexp_mutex);
810
811    gen_mutex_lock(&completion_mutex);
812    /* set completed flag while holding queue lock */
813    jd->completed_flag = 1;
814    if (completion_queue_array[jd->context_id])
815    {
816        job_desc_q_add(completion_queue_array[jd->context_id], jd);
817    }
818
819#ifdef __PVFS2_JOB_THREADED__
820    /* wake up anyone waiting for completion */
821    pthread_cond_signal(&completion_cond);
822#endif
823    gen_mutex_unlock(&completion_mutex);
824
825    return 0;
826}
827
828/* job_bmi_cancel()
829 *
830 * cancels a job handling a BMI message
831 *
832 * returns 0 on succcess, 1 on immediate completion, and -errno on
833 * failure
834 */
835int job_bmi_cancel(job_id_t id, job_context_id context_id)
836{
837    struct job_desc* query = NULL;
838    int ret = -1;
839
840    gen_mutex_lock(&completion_mutex);
841
842    query = id_gen_safe_lookup(id);
843    if (!query || query->completed_flag)
844    {
845        /* job has already completed, no cancellation needed */
846        gen_mutex_unlock(&completion_mutex);
847        return(0);
848    }
849
850    /* tell thread mgr to cancel operation.  This will result in
851     * normal completion path through thread mgr callbacks; no more
852     * work to do here */
853    ret = PINT_thread_mgr_bmi_cancel(
854        query->u.bmi.id, &(query->bmi_callback));
855
856    gen_mutex_unlock(&completion_mutex);
857
858    return(ret);
859}
860
861
862/* job_dev_unexp()
863 *
864 * posts a job for an unexpected device message
865 *
866 * returns 0 on success, -errno on failure, and 1 on immediate
867 * completion
868 */
869int job_dev_unexp(
870    struct PINT_dev_unexp_info* dev_unexp_d,
871    void* user_ptr,
872    job_aint status_user_tag,
873    job_status_s * out_status_p,
874    job_id_t* id,
875    enum job_flags flags,
876    job_context_id context_id)
877{
878    /* post a dev recv for an unexpected message.  We will do a quick
879     * test to see if an unexpected message is available.  If so, we
880     * return the necessary info; if not we queue up to test again later
881     */
882    int ret = -1;
883    struct job_desc *jd = NULL;
884    int outcount = 0;
885
886#ifndef __PVFS2_CLIENT__
887    return(-PVFS_ENOSYS);
888#endif
889
890    /* create the job desc first, even though we may not use it.  This
891     * gives us somewhere to store the user ptr etc.
892     */
893    jd = alloc_job_desc(JOB_DEV_UNEXP);
894    if (!jd)
895    {
896        out_status_p->error_code = -PVFS_ENOMEM;
897        return 1;
898    }
899    jd->job_user_ptr = user_ptr;
900    jd->u.dev_unexp.info = dev_unexp_d;
901    jd->context_id = context_id;
902    jd->status_user_tag = status_user_tag;
903
904    /* only look for immediate completion if our flags alow it */
905    if (!(flags & JOB_NO_IMMED_COMPLETE))
906    {
907        ret = PINT_dev_test_unexpected(
908            1, &outcount, jd->u.dev_unexp.info, 0);
909
910        if (ret < 0)
911        {
912            /* error testing */
913            dealloc_job_desc(jd);
914            jd = NULL;
915            out_status_p->error_code = ret;
916            return 1;
917        }
918
919        if (outcount == 1)
920        {
921            /* there was an unexpected job available */
922            out_status_p->error_code = 0;
923            out_status_p->status_user_tag = status_user_tag;
924            dealloc_job_desc(jd);
925            jd = NULL;
926            return (ret);
927        }
928    }
929
930    /* if we fall through to this point, then there were not any
931     * uenxpected receive's available (or none requested); queue up to
932     * test later
933     */
934    gen_mutex_lock(&dev_unexp_mutex);
935    *id = jd->job_id;
936    job_desc_q_add(dev_unexp_queue, jd);
937    dev_unexp_pending_count++;
938    gen_mutex_unlock(&dev_unexp_mutex);
939
940    PINT_thread_mgr_dev_unexp_handler(dev_thread_mgr_unexp_handler);
941
942    return (0);
943}
944
945/* job_dev_write()
946 *
947 * posts a device write
948 *
949 * returns 0 on success, -errno on failure, and 1 on immediate completion
950 */
951int job_dev_write(void* buffer,
952    int size,
953    PVFS_id_gen_t tag,
954    enum PINT_dev_buffer_type buffer_type,
955    void* user_ptr,
956    job_aint status_user_tag,
957    job_status_s * out_status_p,
958    job_id_t * id,
959    job_context_id context_id)
960{
961    int ret = -1;
962
963    /* NOTE: This function will _always_ immediately complete for now. 
964     * It is really just in the job interface for completeness, in case we
965     * decide later to make the function asynchronous
966     */
967
968#ifndef __PVFS2_CLIENT__
969    return(-PVFS_ENOSYS);
970#endif
971
972    ret = PINT_dev_write(buffer, size, buffer_type, tag);
973    if(ret < 0)
974    {
975        /* error posting */
976        out_status_p->error_code = ret;
977        out_status_p->status_user_tag = status_user_tag;
978        return(1);
979    }
980
981    /* immediate completion */
982    out_status_p->error_code = 0;
983    out_status_p->status_user_tag = status_user_tag;
984    out_status_p->actual_size = size;
985    return(1);
986}
987
988
989/* job_dev_write_list()
990 *
991 * posts a device write for multiple buffers
992 *
993 * returns 0 on success, -errno on failure, and 1 on immediate completion
994 */
995int job_dev_write_list(void** buffer_list,
996    int* size_list,
997    int list_count,
998    int total_size,
999    PVFS_id_gen_t tag,
1000    enum PINT_dev_buffer_type buffer_type,
1001    void* user_ptr,
1002    job_aint status_user_tag,
1003    job_status_s* out_status_p,
1004    job_id_t* id,
1005    job_context_id context_id)
1006{
1007    int ret = -1;
1008
1009    /* NOTE: This function will _always_ immediately complete for now. 
1010     * It is really just in the job interface for completeness, in case we
1011     * decide later to make the function asynchronous
1012     */
1013
1014#ifndef __PVFS2_CLIENT__
1015    return(-PVFS_ENOSYS);
1016#endif
1017
1018    ret = PINT_dev_write_list(buffer_list, size_list, list_count,
1019        total_size, buffer_type, tag);
1020    if(ret < 0)
1021    {
1022        /* error posting */
1023        out_status_p->error_code = ret;
1024        out_status_p->status_user_tag = status_user_tag;
1025        return(1);
1026    }
1027
1028    /* immediate completion */
1029    out_status_p->error_code = 0;
1030    out_status_p->status_user_tag = status_user_tag;
1031    out_status_p->actual_size = total_size;
1032    return(1);
1033}
1034
1035
1036/* job_req_sched_post()
1037 *
1038 * posts a request to the request scheduler
1039 *
1040 * returns 0 on success, -errno on failure, and 1 on immediate
1041 * completion
1042 */
1043int job_req_sched_post(enum PVFS_server_op op,
1044                       PVFS_fs_id fs_id,
1045                       PVFS_handle handle,
1046                       enum PINT_server_req_access_type access_type,
1047                       enum PINT_server_sched_policy sched_policy,
1048                       void *user_ptr,
1049                       job_aint status_user_tag,
1050                       job_status_s * out_status_p,
1051                       job_id_t * id,
1052                       job_context_id context_id)
1053{
1054    /* post a request to the scheduler.  If it completes (or fails)
1055     * immediately, then return and fill in the status structure.
1056     * If it needs to be tested for completion later, then queue up
1057     * a job_desc structure.       
1058     */
1059    /* NOTE: this function is unique in the job interface because
1060     * we have to track a job id even when it completes (to be
1061     * matched in release function).  We will use a seperate queue
1062     * for this.
1063     */
1064    struct job_desc *jd = NULL;
1065    int ret = -1;
1066
1067    jd = alloc_job_desc(JOB_REQ_SCHED);
1068    if (!jd)
1069    {
1070        out_status_p->error_code = -PVFS_ENOMEM;
1071        return 1;
1072    }
1073    jd->job_user_ptr = user_ptr;
1074    jd->u.req_sched.post_flag = 1;
1075    jd->context_id = context_id;
1076    jd->status_user_tag = status_user_tag;
1077
1078    ret = PINT_req_sched_post(
1079        op, fs_id, handle, access_type, sched_policy, jd, &(jd->u.req_sched.id));
1080
1081    if (ret < 0)
1082    {
1083        /* error posting */
1084        dealloc_job_desc(jd);
1085        jd = NULL;
1086        out_status_p->error_code = ret;
1087        out_status_p->status_user_tag = status_user_tag;
1088        return (1);
1089    }
1090
1091    if (ret == 1)
1092    {
1093        /* immediate completion */
1094        out_status_p->error_code = 0;
1095        out_status_p->status_user_tag = status_user_tag;
1096        *id = jd->job_id;
1097        /* don't delete the job desc until a matching release comes through */
1098        return (1);
1099    }
1100
1101    /* if we hit this point, job did not immediately complete-
1102     * queue to test later
1103     */
1104    *id = jd->job_id;
1105
1106    return (0);
1107}
1108
1109int job_req_sched_change_mode(enum PVFS_server_mode mode,
1110                              void *user_ptr,
1111                              job_aint status_user_tag,
1112                              job_status_s *out_status_p,
1113                              job_id_t *id,
1114                              job_context_id context_id)
1115{
1116    struct job_desc *jd = NULL;
1117    int ret = -1;
1118
1119    jd = alloc_job_desc(JOB_REQ_SCHED);
1120    if(!jd)
1121    {
1122        out_status_p->error_code = -PVFS_ENOMEM;
1123        return 1;
1124    }
1125    jd->job_user_ptr = user_ptr;
1126    jd->u.req_sched.post_flag = 1;
1127    jd->context_id = context_id;
1128    jd->status_user_tag = status_user_tag;
1129
1130    ret = PINT_req_sched_change_mode(mode, jd, &(jd->u.req_sched.id));
1131    if (ret < 0)
1132    {
1133        /* error posting */
1134        dealloc_job_desc(jd);
1135        jd = NULL;
1136        out_status_p->error_code = ret;
1137        out_status_p->status_user_tag = status_user_tag;
1138        return (1);
1139    }
1140
1141    if (ret == 1)
1142    {
1143        /* immediate completion */
1144        out_status_p->error_code = 0;
1145        out_status_p->status_user_tag = status_user_tag;
1146        *id = jd->job_id;
1147        /* don't delete the job desc until a matching release comes through */
1148        return (1);
1149    }
1150
1151    *id = jd->job_id;
1152    return (0);
1153}
1154
1155/* job_req_sched_post_timer()
1156 *
1157 * posts a timer to the request scheduler
1158 *
1159 * returns 0 on success, -errno on failure, and 1 on immediate
1160 * completion
1161 */
1162int job_req_sched_post_timer(int msecs,
1163                       void *user_ptr,
1164                       job_aint status_user_tag,
1165                       job_status_s * out_status_p,
1166                       job_id_t * id,
1167                       job_context_id context_id)
1168{
1169    /* post a timer to the scheduler.  If it completes (or fails)
1170     * immediately, then return and fill in the status structure.
1171     * If it needs to be tested for completion later, then queue up
1172     * a job_desc structure.       
1173     */
1174
1175    struct job_desc *jd = NULL;
1176    int ret = -1;
1177
1178    jd = alloc_job_desc(JOB_REQ_SCHED);
1179    if (!jd)
1180    {
1181        out_status_p->error_code = -PVFS_ENOMEM;
1182        return 1;
1183    }
1184    jd->job_user_ptr = user_ptr;
1185    jd->context_id = context_id;
1186    jd->status_user_tag = status_user_tag;
1187
1188    ret = PINT_req_sched_post_timer(msecs, jd, &(jd->u.req_sched.id));
1189
1190    if (ret < 0)
1191    {
1192        /* error posting */
1193        out_status_p->error_code = ret;
1194        out_status_p->status_user_tag = status_user_tag;
1195        dealloc_job_desc(jd);
1196        jd = NULL;
1197        return (1);
1198    }
1199
1200    if (ret == 1)
1201    {
1202        /* immediate completion */
1203        out_status_p->error_code = 0;
1204        out_status_p->status_user_tag = status_user_tag;
1205        dealloc_job_desc(jd);
1206        jd = NULL;
1207        return (1);
1208    }
1209
1210    /* if we hit this point, job did not immediately complete-
1211     * queue to test later
1212     */
1213    if (id)
1214        *id = jd->job_id;
1215
1216    return (0);
1217}
1218
1219
1220/* job_req_sched_release
1221 *
1222 * releases a request from the request scheduler
1223 *
1224 * returns 0 on success, -errno on failure, and 1 on immediate
1225 * completion
1226 */
1227int job_req_sched_release(job_id_t in_completed_id,
1228                          void *user_ptr,
1229                          job_aint status_user_tag,
1230                          job_status_s * out_status_p,
1231                          job_id_t * out_id,
1232                          job_context_id context_id)
1233{
1234    /* this function is a little odd.  We need to do is retrieve the
1235     * job desc that we queued up in the inprogress queue to match the
1236     * release properly
1237     */
1238    struct job_desc *match_jd = NULL;
1239    struct job_desc *jd = NULL;
1240    int ret = -1;
1241
1242    jd = alloc_job_desc(JOB_REQ_SCHED);
1243    if (!jd)
1244    {
1245        out_status_p->error_code = -PVFS_ENOMEM;
1246        return 1;
1247    }
1248    jd->job_user_ptr = user_ptr;
1249    jd->context_id = context_id;
1250    jd->status_user_tag = status_user_tag;
1251
1252    match_jd = id_gen_safe_lookup(in_completed_id);
1253    if (!match_jd)
1254    {
1255        /* id has been released or was not registered */
1256        gossip_err("Error: job_req_sched_release() failed to locate descriptor.\n");
1257        out_status_p->error_code = -PVFS_EINVAL;
1258        out_status_p->status_user_tag = status_user_tag;
1259        dealloc_job_desc(jd);
1260        jd = NULL;
1261        return 1;
1262    }
1263
1264    ret = PINT_req_sched_release(match_jd->u.req_sched.id, jd,
1265                                 &(jd->u.req_sched.id));
1266
1267    /* delete the old req sched job desc; it is no longer needed */
1268    dealloc_job_desc(match_jd);
1269    match_jd = NULL;
1270
1271    if (ret < 0)
1272    {
1273        /* error posting */
1274        dealloc_job_desc(jd);
1275        jd = NULL;
1276        out_status_p->error_code = -PVFS_ENOMEM;
1277        return 1;
1278    }
1279
1280    if (ret == 1)
1281    {
1282        /* immediate completion */
1283        out_status_p->error_code = 0;
1284        out_status_p->status_user_tag = status_user_tag;
1285        dealloc_job_desc(jd);
1286        jd = NULL;
1287        return (1);
1288    }
1289
1290    /* if we hit this point, job did not immediately complete-
1291     * queue to test later
1292     */
1293    *out_id = jd->job_id;
1294
1295    return (0);
1296}
1297
1298
1299/* job_flow()
1300 *
1301 * posts a job to service a flow (where a flow is a complex I/O
1302 * operation between two endpoints, which may be memory, disk, or
1303 * network)
1304 *
1305 * returns 0 on success, 1 on immediate completion, and -errno on
1306 * failure
1307 */
1308int job_flow(flow_descriptor * flow_d,
1309             void *user_ptr,
1310             job_aint status_user_tag,
1311             job_status_s * out_status_p,
1312             job_id_t * id,
1313             job_context_id context_id,
1314             int timeout_sec,
1315             PVFS_hint hints)
1316{
1317    struct job_desc *jd = NULL;
1318    int ret = -1;
1319
1320    /* allocate job descriptor first */
1321    jd = alloc_job_desc(JOB_FLOW);
1322    if (!jd)
1323    {
1324        out_status_p->error_code = -PVFS_ENOMEM;
1325        return 1;
1326    }
1327    jd->hints = hints;
1328    flow_d->hints = hints;
1329    jd->job_user_ptr = user_ptr;
1330    jd->u.flow.flow_d = flow_d;
1331    jd->context_id = context_id;
1332    jd->status_user_tag = status_user_tag;
1333    flow_d->user_ptr = jd;
1334    flow_d->callback = flow_callback;
1335
1336    /* post the flow */
1337    ret = PINT_flow_post(flow_d);
1338    if (ret < 0)
1339    {
1340        out_status_p->error_code = ret;
1341        out_status_p->status_user_tag = status_user_tag;
1342        dealloc_job_desc(jd);
1343        jd = NULL;
1344        return (1);
1345    }
1346    if (ret == 1)
1347    {
1348        /* immediate completion */
1349        out_status_p->error_code = 0;
1350        out_status_p->status_user_tag = status_user_tag;
1351        out_status_p->actual_size = flow_d->total_transferred;
1352        dealloc_job_desc(jd);
1353        jd = NULL;
1354        return (1);
1355    }
1356
1357    /* queue up the job desc. for later completion */
1358    *id = jd->job_id;
1359    flow_pending_count++;
1360    gossip_debug(GOSSIP_FLOW_DEBUG, "Job flows in progress (post time): %d\n",
1361            flow_pending_count);
1362
1363    return(job_time_mgr_add(jd, timeout_sec));
1364}
1365
1366/* job_flow_cancel()
1367 *
1368 * cancels a posted job that is servicing a flow (where a flow is a
1369 * complex I/O operation between two endpoints, which may be memory,
1370 * disk, or network)
1371 *
1372 * returns 0 on success, 1 on immediate completion, and -errno on
1373 * failure
1374 */
1375int job_flow_cancel(job_id_t id, job_context_id context_id)
1376{
1377    struct job_desc* query = NULL;
1378    int ret = -1;
1379
1380    gen_mutex_lock(&completion_mutex);
1381
1382    query = id_gen_safe_lookup(id);
1383
1384    if (!query || query->completed_flag)
1385    {
1386        /* job has already completed, no cancellation needed */
1387        gen_mutex_unlock(&completion_mutex);
1388        return(0);
1389    }
1390
1391    /* cancel flow.  Normal completion path through flow callback will still
1392     * occur; no more work to do here.  NOTE: flow checks for races against
1393     * flow descriptors for which the callback process has already started
1394     */
1395    ret = PINT_flow_cancel(query->u.flow.flow_d);
1396
1397    gen_mutex_unlock(&completion_mutex);
1398
1399    return(ret);
1400}
1401
1402int job_trove_bstream_write_list(TROVE_coll_id coll_id,
1403                                 TROVE_handle handle,
1404                                 char **mem_offset_array,
1405                                 TROVE_size *mem_size_array,
1406                                 int mem_count,
1407                                 TROVE_offset *stream_offset_array,
1408                                 TROVE_size *stream_size_array,
1409                                 int stream_count,
1410                                 TROVE_size *out_size_p,
1411                                 TROVE_ds_flags flags,
1412                                 TROVE_vtag_s *vtag,
1413                                 void * user_ptr,
1414                                 job_aint status_user_tag,
1415                                 job_status_s *out_status_p,
1416                                 job_id_t *id,
1417                                 job_context_id context_id,
1418                                 PVFS_hint hints)
1419{
1420    /* post a trove write.  If it completes (or fails) immediately, then
1421     * return and fill in the status structure.  If it needs to be tested
1422     * for completion later, then queue up a job_desc structure.
1423     */
1424    int ret = -1;
1425    struct job_desc *jd = NULL;
1426    void* user_ptr_internal;
1427
1428    /* create the job desc first, even though we may not use it.  This
1429     * gives us somewhere to store the BMI id and user ptr
1430     */
1431    jd = alloc_job_desc(JOB_TROVE);
1432    if (!jd)
1433    {
1434        out_status_p->error_code = -PVFS_ENOMEM;
1435        return 1;
1436    }
1437    jd->job_user_ptr = user_ptr;
1438    jd->hints = hints;
1439    jd->u.trove.vtag = vtag;
1440    jd->u.trove.out_size_p = out_size_p;
1441    jd->context_id = context_id;
1442    jd->status_user_tag = status_user_tag;
1443    jd->trove_callback.fn = trove_thread_mgr_callback;
1444    jd->trove_callback.data = (void*)jd;
1445    user_ptr_internal = &jd->trove_callback;
1446
1447#ifdef __PVFS2_TROVE_SUPPORT__
1448    ret = trove_bstream_write_list(coll_id, handle,
1449                                   mem_offset_array, mem_size_array,
1450                                   mem_count,
1451                                   stream_offset_array, stream_size_array,
1452                                   stream_count,
1453                                   out_size_p,
1454                                   flags,
1455                                   vtag,
1456                                   user_ptr_internal,
1457                                   global_trove_context,
1458                                   &(jd->u.trove.id), hints);
1459#else
1460    gossip_err("Error: Trove support not enabled.\n");
1461    ret = -ENOSYS;
1462#endif
1463
1464    if (ret < 0)
1465    {
1466        /* error posting trove operation */
1467        dealloc_job_desc(jd);
1468        jd = NULL;
1469        out_status_p->error_code = ret;
1470        out_status_p->status_user_tag = status_user_tag;
1471        return (1);
1472    }
1473
1474    if (ret == 1)
1475    {
1476        /* immediate completion */
1477        out_status_p->error_code = 0;
1478        out_status_p->status_user_tag = status_user_tag;
1479        out_status_p->actual_size = *out_size_p;
1480        out_status_p->vtag = jd->u.trove.vtag;
1481        dealloc_job_desc(jd);
1482        jd = NULL;
1483        return (ret);
1484    }
1485
1486    /* if we fall through to this point, the job did not
1487     * immediately complete and we must queue up to test later
1488     */
1489    *id = jd->job_id;
1490    trove_pending_count++;
1491
1492    return (0);
1493}
1494
1495int job_trove_bstream_read_list(PVFS_fs_id coll_id,
1496                                PVFS_handle handle,
1497                                char **mem_offset_array,
1498                                PVFS_size *mem_size_array,
1499                                int mem_count,
1500                                PVFS_offset *stream_offset_array,
1501                                PVFS_size *stream_size_array,
1502                                int stream_count,
1503                                PVFS_size *out_size_p,
1504                                PVFS_ds_flags flags,
1505                                PVFS_vtag *vtag,
1506                                void * user_ptr,
1507                                job_aint status_user_tag,
1508                                job_status_s * out_status_p,
1509                                job_id_t * id,
1510                                job_context_id context_id,
1511                                PVFS_hint hints)
1512{
1513    /* post a trove read.  If it completes (or fails) immediately, then
1514     * return and fill in the status structure.  If it needs to be tested
1515     * for completion later, then queue up a job_desc structure.
1516     */
1517    int ret = -1;
1518    struct job_desc *jd = NULL;
1519    void* user_ptr_internal;
1520
1521    /* create the job desc first, even though we may not use it.  This
1522     * gives us somewhere to store the BMI id and user ptr
1523     */
1524    jd = alloc_job_desc(JOB_TROVE);
1525    if (!jd)
1526    {
1527        out_status_p->error_code = -PVFS_ENOMEM;
1528        return 1;
1529    }
1530    jd->hints = hints;
1531    jd->job_user_ptr = user_ptr;
1532    jd->u.trove.vtag = vtag;
1533    jd->u.trove.out_size_p = out_size_p;
1534    jd->context_id = context_id;
1535    jd->status_user_tag = status_user_tag;
1536    jd->trove_callback.fn = trove_thread_mgr_callback;
1537    jd->trove_callback.data = (void*)jd;
1538    user_ptr_internal = &jd->trove_callback;
1539
1540#ifdef __PVFS2_TROVE_SUPPORT__
1541    ret = trove_bstream_read_list(coll_id, handle,
1542                                  mem_offset_array, mem_size_array,
1543                                  mem_count,
1544                                  stream_offset_array, stream_size_array,
1545                                  stream_count,
1546                                  out_size_p, flags,
1547                                  jd->u.trove.vtag, user_ptr_internal,
1548                                  global_trove_context,
1549                                  &(jd->u.trove.id), hints);
1550#else
1551    gossip_err("Error: Trove support not enabled.\n");
1552    ret = -ENOSYS;
1553#endif
1554
1555    if (ret < 0)
1556    {
1557        /* error posting trove operation */
1558        dealloc_job_desc(jd);
1559        jd = NULL;
1560        out_status_p->error_code = ret;
1561        out_status_p->status_user_tag = status_user_tag;
1562        return (1);
1563    }
1564
1565    if (ret == 1)
1566    {
1567        /* immediate completion */
1568        out_status_p->error_code = 0;
1569        out_status_p->status_user_tag = status_user_tag;
1570        out_status_p->actual_size = *out_size_p;
1571        out_status_p->vtag = jd->u.trove.vtag;
1572        dealloc_job_desc(jd);
1573        jd = NULL;
1574        return (ret);
1575    }
1576
1577    /* if we fall through to this point, the job did not
1578     * immediately complete and we must queue up to test later
1579     */
1580    *id = jd->job_id;
1581    trove_pending_count++;
1582
1583    return (0);
1584}
1585
1586/* job_trove_bstream_flush()
1587 *
1588 * ask the storage layer to flush data to disk
1589 *
1590 * returns 0 on success, 1 on immediate completion, and -errno on failure
1591 */
1592
1593int job_trove_bstream_flush(PVFS_fs_id coll_id,
1594                            PVFS_handle handle,
1595                            PVFS_ds_flags flags,
1596                            void *user_ptr,
1597                            job_aint status_user_tag,
1598                            job_status_s * out_status_p,
1599                            job_id_t * id,
1600                            job_context_id context_id,
1601                            PVFS_hint hints)
1602
1603{
1604    int ret = -1;
1605    struct job_desc *jd = NULL;
1606    void* user_ptr_internal;
1607
1608    /* create the job desc first, even though we may not use it.  This
1609     * gives us somewhere to store the BMI id and user ptr
1610     */
1611    jd = alloc_job_desc(JOB_TROVE);
1612    if (!jd)
1613    {
1614        out_status_p->error_code = -PVFS_ENOMEM;
1615        return 1;
1616    }
1617    jd->job_user_ptr = user_ptr;
1618    jd->context_id = context_id;
1619    jd->status_user_tag = status_user_tag;
1620    jd->trove_callback.fn = trove_thread_mgr_callback;
1621    jd->trove_callback.data = (void*)jd;
1622    user_ptr_internal = &jd->trove_callback;
1623
1624#ifdef __PVFS2_TROVE_SUPPORT__
1625    ret = trove_bstream_flush(coll_id, handle, flags, user_ptr_internal,
1626                              global_trove_context, &(jd->u.trove.id), hints);
1627#else
1628    gossip_err("Error: Trove support not enabled.\n");
1629    ret = -ENOSYS;
1630#endif
1631
1632    if (ret < 0)
1633    {
1634        /* error posting trove operation */
1635        dealloc_job_desc(jd);
1636        jd = NULL;
1637        out_status_p->error_code = ret;
1638        out_status_p->status_user_tag = status_user_tag;
1639        return (1);
1640    }
1641    if (ret == 1)
1642    {
1643        /* immediate completion */
1644        out_status_p->error_code = 0;
1645        out_status_p->status_user_tag = status_user_tag;
1646        dealloc_job_desc(jd);
1647        jd = NULL;
1648        return (ret);
1649    }
1650    /* if we fall through to this point, the job did not
1651     * immediately complete and we must queue up to test later
1652     */
1653    *id = jd->job_id;
1654    trove_pending_count++;
1655
1656    return (0);
1657}
1658
1659/* job_trove_keyval_read()
1660 *
1661 * storage key/value read
1662 *
1663 * returns 0 on success, 1 on immediate completion, and -errno on
1664 * failure
1665 */
1666int job_trove_keyval_read(PVFS_fs_id coll_id,
1667                          PVFS_handle handle,
1668                          PVFS_ds_keyval * key_p,
1669                          PVFS_ds_keyval * val_p,
1670                          PVFS_ds_flags flags,
1671                          PVFS_vtag * vtag,
1672                          void *user_ptr,
1673                          job_aint status_user_tag,
1674                          job_status_s * out_status_p,
1675                          job_id_t * id,
1676                          job_context_id context_id,
1677                          PVFS_hint hints)
1678{
1679    /* post a trove keyval read.  If it completes (or fails)
1680     * immediately, then return and fill in the status structure.
1681     * If it needs to be tested for completion later, then queue
1682     * up a job_desc structure.  */
1683    int ret = -1;
1684    struct job_desc *jd = NULL;
1685    void* user_ptr_internal;
1686
1687    /* create the job desc first, even though we may not use it.  This
1688     * gives us somewhere to store the BMI id and user ptr
1689     */
1690    jd = alloc_job_desc(JOB_TROVE);
1691    if (!jd)
1692    {
1693        out_status_p->error_code = -PVFS_ENOMEM;
1694        return 1;
1695    }
1696    jd->hints = hints;
1697    jd->job_user_ptr = user_ptr;
1698    jd->u.trove.vtag = vtag;
1699    jd->context_id = context_id;
1700    jd->status_user_tag = status_user_tag;
1701    jd->trove_callback.fn = trove_thread_mgr_callback;
1702    jd->trove_callback.data = (void*)jd;
1703    user_ptr_internal = &jd->trove_callback;
1704
1705#ifdef __PVFS2_TROVE_SUPPORT__
1706    ret = trove_keyval_read(coll_id, handle, key_p, val_p, flags,
1707                            jd->u.trove.vtag, user_ptr_internal,
1708                            global_trove_context, &(jd->u.trove.id), hints);
1709#else
1710    gossip_err("Error: Trove support not enabled.\n");
1711    ret = -ENOSYS;
1712#endif
1713
1714    if (ret < 0)
1715    {
1716        /* error posting trove operation */
1717        dealloc_job_desc(jd);
1718        jd = NULL;
1719        out_status_p->error_code = ret;
1720        out_status_p->status_user_tag = status_user_tag;
1721        return (1);
1722    }
1723
1724    if (ret == 1)
1725    {
1726        /* immediate completion */
1727        out_status_p->error_code = 0;
1728        out_status_p->status_user_tag = status_user_tag;
1729        out_status_p->vtag = jd->u.trove.vtag;
1730        dealloc_job_desc(jd);
1731        jd = NULL;
1732        return (ret);
1733    }
1734
1735    /* if we fall through to this point, the job did not
1736     * immediately complete and we must queue up to test later
1737     */
1738    *id = jd->job_id;
1739    trove_pending_count++;
1740
1741    return (0);
1742}
1743
1744/* job_trove_keyval_read_list()
1745 *
1746 * storage key/value read list
1747 *
1748 * returns 0 on success, 1 on immediate completion, and -errno on
1749 * failure
1750 */
1751int job_trove_keyval_read_list(PVFS_fs_id coll_id,
1752                               PVFS_handle handle,
1753                               PVFS_ds_keyval * key_array,
1754                               PVFS_ds_keyval * val_array,
1755                               PVFS_error * err_array,
1756                               int count,
1757                               PVFS_ds_flags flags,
1758                               PVFS_vtag * vtag,
1759                               void *user_ptr,
1760                               job_aint status_user_tag,
1761                               job_status_s * out_status_p,
1762                               job_id_t * id,
1763                               job_context_id context_id,
1764                               PVFS_hint hints)
1765{
1766    /* post a trove keyval read.  If it completes (or fails)
1767     * immediately, then return and fill in the status structure.
1768     * If it needs to be tested for completion later, then queue
1769     * up a job_desc structure.  */
1770    int ret = -1;
1771    struct job_desc *jd = NULL;
1772    void* user_ptr_internal;
1773
1774    /* create the job desc first, even though we may not use it.  This
1775     * gives us somewhere to store the BMI id and user ptr
1776     */
1777    jd = alloc_job_desc(JOB_TROVE);
1778    if (!jd)
1779    {
1780        out_status_p->error_code = -PVFS_ENOMEM;
1781        return 1;
1782    }
1783    jd->hints = hints;
1784    jd->job_user_ptr = user_ptr;
1785    jd->u.trove.vtag = vtag;
1786    jd->context_id = context_id;
1787    jd->status_user_tag = status_user_tag;
1788    jd->trove_callback.fn = trove_thread_mgr_callback;
1789    jd->trove_callback.data = (void*)jd;
1790    user_ptr_internal = &jd->trove_callback;
1791
1792#ifdef __PVFS2_TROVE_SUPPORT__
1793    ret = trove_keyval_read_list(coll_id, handle, key_array, val_array,
1794                                 err_array, count, flags, jd->u.trove.vtag,
1795                                 user_ptr_internal,
1796                                 global_trove_context, &(jd->u.trove.id), hints);
1797#else
1798    gossip_err("Error: Trove support not enabled.\n");
1799    ret = -ENOSYS;
1800#endif
1801
1802    if (ret < 0)
1803    {
1804        /* error posting trove operation */
1805        dealloc_job_desc(jd);
1806        jd = NULL;
1807        out_status_p->error_code = ret;
1808        out_status_p->status_user_tag = status_user_tag;
1809        return (1);
1810    }
1811
1812    if (ret == 1)
1813    {
1814        /* immediate completion */
1815        out_status_p->error_code = 0;
1816        out_status_p->status_user_tag = status_user_tag;
1817        out_status_p->vtag = jd->u.trove.vtag;
1818        dealloc_job_desc(jd);
1819        jd = NULL;
1820        return (ret);
1821    }
1822
1823    /* if we fall through to this point, the job did not
1824     * immediately complete and we must queue up to test later
1825     */
1826    *id = jd->job_id;
1827    trove_pending_count++;
1828
1829    return (0);
1830}
1831
1832/* job_trove_keyval_write()
1833 *
1834 * storage key/value write
1835 *
1836 * returns 0 on success, 1 on immediate completion, and -errno on
1837 * failure
1838 */
1839int job_trove_keyval_write(PVFS_fs_id coll_id,
1840                           PVFS_handle handle,
1841                           PVFS_ds_keyval * key_p,
1842                           PVFS_ds_keyval * val_p,
1843                           PVFS_ds_flags flags,
1844                           PVFS_vtag * vtag,
1845                           void *user_ptr,
1846                           job_aint status_user_tag,
1847                           job_status_s * out_status_p,
1848                           job_id_t * id,
1849                           job_context_id context_id,
1850                           PVFS_hint hints)
1851{
1852    /* post a trove keyval write.  If it completes (or fails)
1853     * immediately, then return and fill in the status structure.
1854     * If it needs to be tested for completion later, then queue
1855     * up a job_desc structure.  */
1856    int ret = -1;
1857    struct job_desc *jd = NULL;
1858    void* user_ptr_internal;
1859
1860    /* create the job desc first, even though we may not use it.  This
1861     * gives us somewhere to store the BMI id and user ptr
1862     */
1863    jd = alloc_job_desc(JOB_TROVE);
1864    if (!jd)
1865    {
1866        out_status_p->error_code = -PVFS_ENOMEM;
1867        return 1;
1868    }
1869    jd->hints = hints;
1870    jd->job_user_ptr = user_ptr;
1871    jd->u.trove.vtag = vtag;
1872    jd->context_id = context_id;
1873    jd->status_user_tag = status_user_tag;
1874    jd->trove_callback.fn = trove_thread_mgr_callback;
1875    jd->trove_callback.data = (void*)jd;
1876    user_ptr_internal = &jd->trove_callback;
1877
1878#ifdef __PVFS2_TROVE_SUPPORT__
1879    ret = trove_keyval_write(coll_id, handle, key_p, val_p, flags,
1880                             jd->u.trove.vtag, user_ptr_internal,
1881                             global_trove_context,
1882                             &(jd->u.trove.id), hints);
1883#else
1884    gossip_err("Error: Trove support not enabled.\n");
1885    ret = -ENOSYS;
1886#endif
1887
1888    if (ret < 0)
1889    {
1890        /* error posting trove operation */
1891        dealloc_job_desc(jd);
1892        jd = NULL;
1893        out_status_p->error_code = ret;
1894        out_status_p->status_user_tag = status_user_tag;
1895        return (1);
1896    }
1897
1898    if (ret == 1)
1899    {
1900        /* immediate completion */
1901        out_status_p->error_code = 0;
1902        out_status_p->status_user_tag = status_user_tag;
1903        out_status_p->vtag = jd->u.trove.vtag;
1904        dealloc_job_desc(jd);
1905        jd = NULL;
1906        return (ret);
1907    }
1908
1909    /* if we fall through to this point, the job did not
1910     * immediately complete and we must queue up to test later
1911     */
1912    *id = jd->job_id;
1913    trove_pending_count++;
1914
1915    return (0);
1916}
1917
1918/* job_trove_keyval_write_list()
1919 *
1920 * storage key/value list write
1921 *
1922 * returns 0 on success, 1 on immediate completion, and -errno on
1923 * failure
1924 */
1925int job_trove_keyval_write_list(PVFS_fs_id coll_id,
1926                           PVFS_handle handle,
1927                           PVFS_ds_keyval * key_array,
1928                           PVFS_ds_keyval * val_array,
1929                           int32_t count,
1930                           PVFS_ds_flags flags,
1931                           PVFS_vtag * vtag,
1932                           void *user_ptr,
1933                           job_aint status_user_tag,
1934                           job_status_s * out_status_p,
1935                           job_id_t * id,
1936                           job_context_id context_id,
1937                           PVFS_hint hints)
1938{
1939    /* post a trove keyval write.  If it completes (or fails)
1940     * immediately, then return and fill in the status structure.
1941     * If it needs to be tested for completion later, then queue
1942     * up a job_desc structure.  */
1943    int ret = -1;
1944    struct job_desc *jd = NULL;
1945    void* user_ptr_internal;
1946
1947    /* create the job desc first, even though we may not use it.  This
1948     * gives us somewhere to store the BMI id and user ptr
1949     */
1950    jd = alloc_job_desc(JOB_TROVE);
1951    if (!jd)
1952    {
1953        out_status_p->error_code = -PVFS_ENOMEM;
1954        return 1;
1955    }
1956    jd->hints = hints;
1957    jd->job_user_ptr = user_ptr;
1958    jd->u.trove.vtag = vtag;
1959    jd->context_id = context_id;
1960    jd->status_user_tag = status_user_tag;
1961    jd->trove_callback.fn = trove_thread_mgr_callback;
1962    jd->trove_callback.data = (void*)jd;
1963    user_ptr_internal = &jd->trove_callback;
1964
1965#ifdef __PVFS2_TROVE_SUPPORT__
1966    gossip_debug(GOSSIP_JOB_DEBUG, "job_trove_keyval_write_list() posting trove_keyval_write_list()\n");
1967    ret = trove_keyval_write_list(coll_id, handle,
1968                             key_array, val_array,
1969                             count, flags,
1970                             jd->u.trove.vtag, user_ptr_internal,
1971                             global_trove_context,
1972                             &(jd->u.trove.id), hints);
1973#else
1974    gossip_err("Error: Trove support not enabled.\n");
1975    ret = -ENOSYS;
1976#endif
1977
1978    if (ret < 0)
1979    {
1980        /* error posting trove operation */
1981        dealloc_job_desc(jd);
1982        jd = NULL;
1983        out_status_p->error_code = ret;
1984        out_status_p->status_user_tag = status_user_tag;
1985        return (1);
1986    }
1987
1988    if (ret == 1)
1989    {
1990        /* immediate completion */
1991        out_status_p->error_code = 0;
1992        out_status_p->status_user_tag = status_user_tag;
1993        out_status_p->vtag = jd->u.trove.vtag;
1994        dealloc_job_desc(jd);
1995        jd = NULL;
1996        return (ret);
1997    }
1998
1999    /* if we fall through to this point, the job did not
2000     * immediately complete and we must queue up to test later
2001     */
2002    *id = jd->job_id;
2003    trove_pending_count++;
2004
2005    return (0);
2006}
2007
2008int job_trove_keyval_remove_list(PVFS_fs_id coll_id,
2009                                  PVFS_handle handle,
2010                                  PVFS_ds_keyval * key_array,
2011                                  PVFS_ds_keyval * val_array,
2012                                  int * error_array,
2013                                  int count,
2014                                  PVFS_ds_flags flags,
2015                                  PVFS_vtag * vtag,
2016                                  void *user_ptr,
2017                                  job_aint status_user_tag,
2018                                  job_status_s * out_status_p,
2019                                  job_id_t * id,
2020                                  job_context_id context_id,
2021                                  PVFS_hint hints)
2022{
2023    int ret = -1;
2024    struct job_desc *jd = NULL;
2025    void* user_ptr_internal;
2026
2027    /* create the job desc first, even though we may not use it.  This
2028     * gives us somewhere to store the BMI id and user ptr
2029     */
2030    jd = alloc_job_desc(JOB_TROVE);
2031    if (!jd)
2032    {
2033        out_status_p->error_code = -PVFS_ENOMEM;
2034        return 1;
2035    }
2036    jd->job_user_ptr = user_ptr;
2037    jd->u.trove.vtag = vtag;
2038    jd->context_id = context_id;
2039    jd->status_user_tag = status_user_tag;
2040    jd->trove_callback.fn = trove_thread_mgr_callback;
2041    jd->trove_callback.data = (void*)jd;
2042    user_ptr_internal = &jd->trove_callback;
2043
2044#ifdef __PVFS2_TROVE_SUPPORT__
2045    ret = trove_keyval_remove_list(coll_id, handle,
2046                             key_array, val_array, error_array,
2047                             count, flags,
2048                             jd->u.trove.vtag, user_ptr_internal,
2049                             global_trove_context,
2050                             &(jd->u.trove.id),
2051                             hints);
2052#else
2053    gossip_err("Error: Trove support not enabled.\n");
2054    ret = -ENOSYS;
2055#endif
2056
2057    if (ret < 0)
2058    {
2059        /* error posting trove operation */
2060        dealloc_job_desc(jd);
2061        jd = NULL;
2062        out_status_p->error_code = ret;
2063        out_status_p->status_user_tag = status_user_tag;
2064        return (1);
2065    }
2066
2067    if (ret == 1)
2068    {
2069        /* immediate completion */
2070        out_status_p->error_code = 0;
2071        out_status_p->status_user_tag = status_user_tag;
2072        out_status_p->vtag = jd->u.trove.vtag;
2073        dealloc_job_desc(jd);
2074        jd = NULL;
2075        return (ret);
2076    }
2077
2078    /* if we fall through to this point, the job did not
2079     * immediately complete and we must queue up to test later
2080     */
2081    *id = jd->job_id;
2082    trove_pending_count++;
2083
2084    return (0);
2085}
2086
2087
2088/* job_trove_keyval_flush()
2089 *
2090 * ask the storage layer to flush keyvals to disk
2091 *
2092 * returns 0 on success, 1 on immediate completion, and -errno on failure
2093 */
2094
2095int job_trove_keyval_flush(PVFS_fs_id coll_id,
2096                           PVFS_handle handle,
2097                           PVFS_ds_flags flags,
2098                           void * user_ptr,
2099                           job_aint status_user_tag,
2100                           job_status_s * out_status_p,
2101                           job_id_t * id,
2102                           job_context_id context_id,
2103                           PVFS_hint hints)
2104{
2105    int ret = -1;
2106    struct job_desc *jd = NULL;
2107    void* user_ptr_internal;
2108
2109    /* create the job desc first, even though we may not use it.  This
2110     * gives us somewhere to store the BMI id and user ptr
2111     */
2112    jd = alloc_job_desc(JOB_TROVE);
2113    if (!jd)
2114    {
2115        out_status_p->error_code = -PVFS_ENOMEM;
2116        return 1;
2117    }
2118    jd->job_user_ptr = user_ptr;
2119    jd->context_id = context_id;
2120    jd->status_user_tag = status_user_tag;
2121    jd->trove_callback.fn = trove_thread_mgr_callback;
2122    jd->trove_callback.data = (void*)jd;
2123    user_ptr_internal = &jd->trove_callback;
2124
2125#ifdef __PVFS2_TROVE_SUPPORT__
2126    ret = trove_keyval_flush(coll_id, handle, flags, user_ptr_internal,
2127                             global_trove_context, &(jd->u.trove.id), hints);
2128#else
2129    gossip_err("Error: Trove support not enabled.\n");
2130    ret = -ENOSYS;
2131#endif
2132
2133    if (ret < 0)
2134    {
2135        /* error posting trove operation */
2136        dealloc_job_desc(jd);
2137        jd = NULL;
2138        out_status_p->error_code = ret;
2139        out_status_p->status_user_tag = status_user_tag;
2140        return (1);
2141    }
2142
2143    if (ret == 1)
2144    {
2145        /* immediate completion */
2146        out_status_p->error_code = 0;
2147        out_status_p->status_user_tag = status_user_tag;
2148        dealloc_job_desc(jd);
2149        jd = NULL;
2150        return (ret);
2151    }
2152
2153    /* if we fall through to this point, the job did not
2154     * immediately complete and we must queue up to test later
2155     */
2156    *id = jd->job_id;
2157    trove_pending_count++;
2158
2159    return (0);
2160}
2161
2162int job_trove_keyval_get_handle_info(PVFS_fs_id coll_id,
2163                                     PVFS_handle handle,
2164                                     PVFS_ds_flags flags,
2165                                     PVFS_ds_keyval_handle_info *info,
2166                                     void *user_ptr,
2167                                     job_aint status_user_tag,
2168                                     job_status_s * out_status_p,
2169                                     job_id_t * id,
2170                                     job_context_id context_id,
2171                                     PVFS_hint hints)
2172{
2173    /* post a trove operation keyval get handle info.  If it completes (or
2174     * fails) immediately, then return and fill in the status
2175     * structure.  If it needs to be tested for completion later,
2176     * then queue up a job desc structure.
2177     */
2178
2179    int ret = -1;
2180    struct job_desc *jd = NULL;
2181    void* user_ptr_internal;
2182
2183    /* create the job desc first, even though we may not use it.  This
2184     * gives us somewhere to store the BMI id and user ptr
2185     */
2186    jd = alloc_job_desc(JOB_TROVE);
2187    if (!jd)
2188    {
2189        out_status_p->error_code = -PVFS_ENOMEM;
2190        return 1;
2191    }
2192    jd->hints = hints;
2193    jd->job_user_ptr = user_ptr;
2194    jd->context_id = context_id;
2195    jd->status_user_tag = status_user_tag;
2196    jd->trove_callback.fn = trove_thread_mgr_callback;
2197    jd->trove_callback.data = (void*)jd;
2198    user_ptr_internal = &jd->trove_callback;
2199
2200
2201
2202#ifdef __PVFS2_TROVE_SUPPORT__
2203    ret = trove_keyval_get_handle_info(
2204        coll_id,
2205        handle,
2206        flags,
2207        info,
2208        user_ptr_internal,
2209        global_trove_context, &(jd->u.trove.id), hints);
2210#else
2211    gossip_err("Error: Trove support not enabled.\n");
2212    ret = -ENOSYS;
2213#endif
2214
2215    if (ret < 0)
2216    {
2217        /* error posting trove operation */
2218        dealloc_job_desc(jd);
2219        jd = NULL;
2220        out_status_p->error_code = ret;
2221        out_status_p->status_user_tag = status_user_tag;
2222        return (1);
2223    }
2224
2225    if (ret == 1)
2226    {
2227        /* immediate completion */
2228        out_status_p->error_code = 0;
2229        out_status_p->status_user_tag = status_user_tag;
2230        dealloc_job_desc(jd);
2231        jd = NULL;
2232        return (ret);
2233    }
2234
2235    /* if we fall to this point, the job did not immediately complete and
2236     * we must queue up to test it later
2237     */
2238    *id = jd->job_id;
2239    trove_pending_count++;
2240
2241    return (0);
2242}
2243
2244
2245/* job_trove_dspace_getattr()
2246 *
2247 * read generic dspace attributes
2248 *
2249 * returns 0 on success, 1 on immediate completion, and -errno on
2250 * failure
2251 */
2252int job_trove_dspace_getattr(PVFS_fs_id coll_id,
2253                             PVFS_handle handle,
2254                             void *user_ptr,
2255                             PVFS_ds_attributes *out_ds_attr_ptr,
2256                             job_aint status_user_tag,
2257                             job_status_s *out_status_p,
2258                             job_id_t *id,
2259                             job_context_id context_id,
2260                             PVFS_hint hints)
2261{
2262    /* post a trove operation dspace get attr.  If it completes (or
2263     * fails) immediately, then return and fill in the status
2264     * structure.  If it needs to be tested for completion later,
2265     * then queue up a job desc structure.
2266     */
2267
2268    int ret = -1;
2269    struct job_desc *jd = NULL;
2270    void* user_ptr_internal;
2271
2272    /* create the job desc first, even though we may not use it.  This
2273     * gives us somewhere to store the BMI id and user ptr
2274     */
2275    jd = alloc_job_desc(JOB_TROVE);
2276    if (!jd)
2277    {
2278        out_status_p->error_code = -PVFS_ENOMEM;
2279        return 1;
2280    }
2281    jd->job_user_ptr = user_ptr;
2282    jd->context_id = context_id;
2283    jd->status_user_tag = status_user_tag;
2284    jd->trove_callback.fn = trove_thread_mgr_callback;
2285    jd->trove_callback.data = (void*)jd;
2286    user_ptr_internal = &jd->trove_callback;
2287
2288
2289
2290#ifdef __PVFS2_TROVE_SUPPORT__
2291    ret = trove_dspace_getattr(coll_id,
2292                               handle, out_ds_attr_ptr, 0 /* flags */ ,
2293                               user_ptr_internal,
2294                               global_trove_context, &(jd->u.trove.id), hints);
2295#else
2296    gossip_err("Error: Trove support not enabled.\n");
2297    ret = -ENOSYS;
2298#endif
2299
2300    if (ret < 0)
2301    {
2302        /* error posting trove operation */
2303        dealloc_job_desc(jd);
2304        jd = NULL;
2305        out_status_p->error_code = ret;
2306        out_status_p->status_user_tag = status_user_tag;
2307        return (1);
2308    }
2309
2310    if (ret == 1)
2311    {
2312        /* immediate completion */
2313        out_status_p->error_code = 0;
2314        out_status_p->status_user_tag = status_user_tag;
2315        dealloc_job_desc(jd);
2316        jd = NULL;
2317        return (ret);
2318    }
2319
2320    /* if we fall to this point, the job did not immediately complete and
2321     * we must queue up to test it later
2322     */
2323    *id = jd->job_id;
2324    trove_pending_count++;
2325
2326    return (0);
2327}
2328
2329/* job_trove_dspace_getattr_list()
2330 *
2331 * read generic dspace attributes for a set of handles
2332 *
2333 * returns 0 on success, 1 on immediate completion, and -errno on
2334 * failure
2335 */
2336int job_trove_dspace_getattr_list(PVFS_fs_id coll_id,
2337                             int nhandles,
2338                             PVFS_handle *handle_array,
2339                             void *user_ptr,
2340                             PVFS_error *out_error_array,
2341                             PVFS_ds_attributes *out_ds_attr_ptr,
2342                             job_aint status_user_tag,
2343                             job_status_s *out_status_p,
2344                             job_id_t *id,
2345                             job_context_id context_id,
2346                             PVFS_hint hints)
2347{
2348    /* post a trove operation dspace get attr list.  If it completes (or
2349     * fails) immediately, then return and fill in the status
2350     * structure.  If it needs to be tested for completion later,
2351     * then queue up a job desc structure.
2352     */
2353
2354    int ret = -1;
2355    struct job_desc *jd = NULL;
2356    void* user_ptr_internal;
2357
2358    /* create the job desc first, even though we may not use it.  This
2359     * gives us somewhere to store the BMI id and user ptr
2360     */
2361    jd = alloc_job_desc(JOB_TROVE);
2362    if (!jd)
2363    {
2364        out_status_p->error_code = -PVFS_ENOMEM;
2365        return 1;
2366    }
2367    jd->hints = hints;
2368    jd->job_user_ptr = user_ptr;
2369    jd->context_id = context_id;
2370    jd->status_user_tag = status_user_tag;
2371    jd->trove_callback.fn = trove_thread_mgr_callback;
2372    jd->trove_callback.data = (void*)jd;
2373    user_ptr_internal = &jd->trove_callback;
2374
2375
2376
2377#ifdef __PVFS2_TROVE_SUPPORT__
2378    ret = trove_dspace_getattr_list(coll_id,
2379                               nhandles,
2380                               handle_array, out_ds_attr_ptr,
2381                               out_error_array,
2382                               0 /* flags */ ,
2383                               user_ptr_internal,
2384                               global_trove_context, &(jd->u.trove.id), hints);
2385#else
2386    gossip_err("Error: Trove support not enabled.\n");
2387    ret = -ENOSYS;
2388#endif
2389
2390    if (ret < 0)
2391    {
2392        /* error posting trove operation */
2393        dealloc_job_desc(jd);
2394        jd = NULL;
2395        out_status_p->error_code = ret;
2396        out_status_p->status_user_tag = status_user_tag;
2397        return (1);
2398    }
2399
2400    if (ret == 1)
2401    {
2402        /* immediate completion */
2403        out_status_p->error_code = 0;
2404        out_status_p->status_user_tag = status_user_tag;
2405        dealloc_job_desc(jd);
2406        jd = NULL;
2407        return (ret);
2408    }
2409
2410    /* if we fall to this point, the job did not immediately complete and
2411     * we must queue up to test it later
2412     */
2413    *id = jd->job_id;
2414    trove_pending_count++;
2415
2416    return (0);
2417}
2418
2419/* job_trove_dspace_setattr()
2420 *
2421 * write generic dspace attributes
2422 *
2423 * returns 0 on success, 1 on immediate completion, and -errno on
2424 * failure
2425 */
2426int job_trove_dspace_setattr(PVFS_fs_id coll_id,
2427                             PVFS_handle handle,
2428                             PVFS_ds_attributes *ds_attr_p,
2429                             PVFS_ds_flags flags,
2430                             void *user_ptr,
2431                             job_aint status_user_tag,
2432                             job_status_s * out_status_p,
2433                             job_id_t * id,
2434                             job_context_id context_id,
2435                             PVFS_hint hints)
2436{
2437    /* post a trove operation dspace set attr.  If it completes (or
2438     * fails) immediately, then return and fill in the status
2439     * structure.  If it needs to be tested for completion later,
2440     * then queue up a job desc structure.
2441     */
2442
2443    int ret = -1;
2444    struct job_desc *jd = NULL;
2445    void* user_ptr_internal;
2446
2447    /* create the job desc first, even though we may not use it.  This
2448     * gives us somewhere to store the BMI id and user ptr
2449     */
2450    jd = alloc_job_desc(JOB_TROVE);
2451    if (!jd)
2452    {
2453        out_status_p->error_code = -PVFS_ENOMEM;
2454        return 1;
2455    }
2456    jd->job_user_ptr = user_ptr;
2457    jd->context_id = context_id;
2458    jd->status_user_tag = status_user_tag;
2459    jd->trove_callback.fn = trove_thread_mgr_callback;
2460    jd->trove_callback.data = (void*)jd;
2461    user_ptr_internal = &jd->trove_callback;
2462
2463#ifdef __PVFS2_TROVE_SUPPORT__
2464    ret = trove_dspace_setattr(coll_id, handle, ds_attr_p,
2465                               flags,
2466                               user_ptr_internal, global_trove_context,
2467                               &(jd->u.trove.id), hints);
2468#else
2469    gossip_err("Error: Trove support not enabled.\n");
2470    ret = -ENOSYS;
2471#endif
2472
2473    if (ret < 0)
2474    {
2475        /* error posting trove operation */
2476        dealloc_job_desc(jd);
2477        jd = NULL;
2478        out_status_p->error_code = ret;
2479        out_status_p->status_user_tag = status_user_tag;
2480        return (1);
2481    }
2482
2483    if (ret == 1)
2484    {
2485        /* immediate completion */
2486        out_status_p->error_code = 0;
2487        out_status_p->status_user_tag = status_user_tag;
2488        dealloc_job_desc(jd);
2489        jd = NULL;
2490        return (ret);
2491    }
2492
2493    /* if we fall to this point, the job did not immediately complete and
2494     * we must queue up to test it later
2495     */
2496    *id = jd->job_id;
2497    trove_pending_count++;
2498
2499    return (0);
2500}
2501
2502/* job_trove_bstream_resize()
2503 *
2504 * resize (truncate or preallocate) a storage byte stream
2505 *
2506 * returns 0 on success, 1 on immediate completion, and -errno on
2507 * failure
2508 */
2509int job_trove_bstream_resize(PVFS_fs_id coll_id,
2510                             PVFS_handle handle,
2511                             PVFS_size size,
2512                             PVFS_ds_flags flags,
2513                             PVFS_vtag * vtag,
2514                             void *user_ptr,
2515                             job_aint status_user_tag,
2516                             job_status_s * out_status_p,
2517                             job_id_t * id,
2518                             job_context_id context_id,
2519                             PVFS_hint hints)
2520{
2521    /* post a resize trove operation.  If it completes (or
2522     * fails) immediately, then return and fill in the status
2523     * structure.  If it needs to be tested for completion later,
2524     * then queue up a job desc structure.
2525     */
2526
2527    int ret = -1;
2528    struct job_desc *jd = NULL;
2529    void* user_ptr_internal;
2530
2531    /* create the job desc first, even though we may not use it.  This
2532     * gives us somewhere to store the BMI id and user ptr
2533     */
2534    jd = alloc_job_desc(JOB_TROVE);
2535    if (!jd)
2536    {
2537        out_status_p->error_code = -PVFS_ENOMEM;
2538        return 1;
2539    }
2540    jd->job_user_ptr = user_ptr;
2541    jd->context_id = context_id;
2542    jd->status_user_tag = status_user_tag;
2543    jd->trove_callback.fn = trove_thread_mgr_callback;
2544    jd->trove_callback.data = (void*)jd;
2545    user_ptr_internal = &jd->trove_callback;
2546
2547#ifdef __PVFS2_TROVE_SUPPORT__
2548    ret = trove_bstream_resize(coll_id, handle, &size,
2549                               flags,
2550                               vtag, user_ptr_internal, global_trove_context,
2551                               &(jd->u.trove.id), hints);
2552#else
2553    gossip_err("Error: Trove support not enabled.\n");
2554    ret = -ENOSYS;
2555#endif
2556
2557    if (ret < 0)
2558    {
2559        /* error posting trove operation */
2560        dealloc_job_desc(jd);
2561        jd = NULL;
2562        out_status_p->error_code = ret;
2563        out_status_p->status_user_tag = status_user_tag;
2564        return (1);
2565    }
2566
2567    if (ret == 1)
2568    {
2569        /* immediate completion */
2570        out_status_p->error_code = 0;
2571        out_status_p->status_user_tag = status_user_tag;
2572        dealloc_job_desc(jd);
2573        jd = NULL;
2574        return (ret);
2575    }
2576
2577    /* if we fall to this point, the job did not immediately complete and
2578     * we must queue up to test it later
2579     */
2580    *id = jd->job_id;
2581    trove_pending_count++;
2582
2583    return (0);
2584}
2585
2586/* job_trove_bstream_validate()
2587 *
2588 * check consistency of a bytestream for a given vtag
2589 *
2590 * returns 0 on success, 1 on immediate completion, and -errno on
2591 * failure
2592 */
2593int job_trove_bstream_validate(PVFS_fs_id coll_id,
2594                               PVFS_handle handle,
2595                               PVFS_vtag * vtag,
2596                               void *user_ptr,
2597                               job_aint status_user_tag,
2598                               job_status_s * out_status_p,
2599                               job_id_t * id,
2600                               job_context_id context_id,
2601                               PVFS_hint hints)
2602{
2603    gossip_lerr("Error: unimplemented.\n");
2604    out_status_p->error_code = -PVFS_ENOSYS;
2605    return 1;
2606}
2607
2608/* job_trove_keyval_remove()
2609 *
2610 * remove a key/value entry
2611 *
2612 * returns 0 on success, 1 on immediate completion, and -errno on
2613 * failure
2614 */
2615int job_trove_keyval_remove(PVFS_fs_id coll_id,
2616                            PVFS_handle handle,
2617                            PVFS_ds_keyval * key_p,
2618                            PVFS_ds_keyval * val_p,
2619                            PVFS_ds_flags flags,
2620                            PVFS_vtag * vtag,
2621                            void *user_ptr,
2622                            job_aint status_user_tag,
2623                            job_status_s * out_status_p,
2624                            job_id_t * id,
2625                            job_context_id context_id,
2626                            PVFS_hint hints)
2627{
2628    /* post a trove keyval remove.  If it completes (or fails)
2629     * immediately, then return and fill in the status structure.
2630     * If it needs to be tested for completion later, then queue
2631     * up a job_desc structure.  */
2632    int ret = -1;
2633    struct job_desc *jd = NULL;
2634    void* user_ptr_internal;
2635
2636    /* create the job desc first, even though we may not use it.  This
2637     * gives us somewhere to store the BMI id and user ptr
2638     */
2639    jd = alloc_job_desc(JOB_TROVE);
2640    if (!jd)
2641    {
2642        out_status_p->error_code = -PVFS_ENOMEM;
2643        return 1;
2644    }
2645    jd->hints = hints;
2646    jd->job_user_ptr = user_ptr;
2647    jd->u.trove.vtag = vtag;
2648    jd->context_id = context_id;
2649    jd->status_user_tag = status_user_tag;
2650    jd->trove_callback.fn = trove_thread_mgr_callback;
2651    jd->trove_callback.data = (void*)jd;
2652    user_ptr_internal = &jd->trove_callback;
2653
2654#ifdef __PVFS2_TROVE_SUPPORT__
2655    ret = trove_keyval_remove(coll_id, handle, key_p, val_p, flags,
2656                              jd->u.trove.vtag, user_ptr_internal,
2657                              global_trove_context, &(jd->u.trove.id), hints);
2658#else
2659    gossip_err("Error: Trove support not enabled.\n");
2660    ret = -ENOSYS;
2661#endif
2662
2663    if (ret < 0)
2664    {
2665        /* error posting trove operation */
2666        dealloc_job_desc(jd);
2667        jd = NULL;
2668        out_status_p->error_code = ret;
2669        out_status_p->status_user_tag = status_user_tag;
2670        return (1);
2671    }
2672
2673    if (ret == 1)
2674    {
2675        /* immediate completion */
2676        out_status_p->error_code = 0;
2677        out_status_p->status_user_tag = status_user_tag;
2678        out_status_p->vtag = jd->u.trove.vtag;
2679        dealloc_job_desc(jd);
2680        jd = NULL;
2681        return (ret);
2682    }
2683
2684    /* if we fall through to this point, the job did not
2685     * immediately complete and we must queue up to test later
2686     */
2687    *id = jd->job_id;
2688    trove_pending_count++;
2689
2690    return (0);
2691}
2692
2693/* job_trove_keyval_validate()
2694 *
2695 * check consistency of a key/value pair for a given vtag
2696 *
2697 * returns 0 on success, 1 on immediate completion, and -errno on
2698 * failure
2699 */
2700int job_trove_keyval_validate(PVFS_fs_id coll_id,
2701                              PVFS_handle handle,
2702                              PVFS_vtag * vtag,
2703                              void *user_ptr,
2704                              job_aint status_user_tag,
2705                              job_status_s * out_status_p,
2706                              job_id_t * id,
2707                              job_context_id context_id,
2708                              PVFS_hint hints)
2709{
2710    gossip_lerr("Error: unimplemented.\n");
2711    out_status_p->error_code = -PVFS_ENOSYS;
2712    return 1;
2713}
2714
2715/* job_trove_keyval_iterate()
2716 *
2717 * iterate through all of the key/value pairs for a data space
2718 *
2719 * returns 0 on success, 1 on immediate completion, and -errno on
2720 * failure
2721 */
2722int job_trove_keyval_iterate(PVFS_fs_id coll_id,
2723                             PVFS_handle handle,
2724                             PVFS_ds_position position,
2725                             PVFS_ds_keyval * key_array,
2726                             PVFS_ds_keyval * val_array,
2727                             int count,
2728                             PVFS_ds_flags flags,
2729                             PVFS_vtag * vtag,
2730                             void *user_ptr,
2731                             job_aint status_user_tag,
2732                             job_status_s * out_status_p,
2733                             job_id_t * id,
2734                             job_context_id context_id,
2735                             PVFS_hint hints)
2736{
2737    /* post a trove keyval iterate.  If it completes (or fails)
2738     * immediately, then return and fill in the status structure.
2739     * If it needs to be tested for completion later, then queue
2740     * up a job_desc structure.  */
2741    int ret = -1;
2742    struct job_desc *jd = NULL;
2743    void* user_ptr_internal;
2744
2745    /* create the job desc first, even though we may not use it.  This
2746     * gives us somewhere to store the BMI id and user ptr
2747     */
2748    jd = alloc_job_desc(JOB_TROVE);
2749    if (!jd)
2750    {
2751        out_status_p->error_code = -PVFS_ENOMEM;
2752        return 1;
2753    }
2754    jd->hints = hints;
2755    jd->job_user_ptr = user_ptr;
2756    jd->u.trove.vtag = vtag;
2757    jd->u.trove.position = position;
2758    jd->u.trove.count = count;
2759    jd->context_id = context_id;
2760    jd->status_user_tag = status_user_tag;
2761    jd->trove_callback.fn = trove_thread_mgr_callback;
2762    jd->trove_callback.data = (void*)jd;
2763    user_ptr_internal = &jd->trove_callback;
2764
2765#ifdef __PVFS2_TROVE_SUPPORT__
2766    ret = trove_keyval_iterate(coll_id, handle,
2767                               &(jd->u.trove.position), key_array, val_array,
2768                               &(jd->u.trove.count), flags, jd->u.trove.vtag,
2769                               user_ptr_internal,
2770                               global_trove_context, &(jd->u.trove.id), hints);
2771#else
2772    gossip_err("Error: Trove support not enabled.\n");
2773    ret = -ENOSYS;
2774#endif
2775
2776    if (ret < 0)
2777    {
2778        /* error posting trove operation */
2779        dealloc_job_desc(jd);
2780        jd = NULL;
2781        out_status_p->error_code = ret;
2782        out_status_p->status_user_tag = status_user_tag;
2783        return (1);
2784    }
2785
2786    if (ret == 1)
2787    {
2788        /* immediate completion */
2789        out_status_p->error_code = 0;
2790        out_status_p->status_user_tag = status_user_tag;
2791        out_status_p->vtag = jd->u.trove.vtag;
2792        out_status_p->position = jd->u.trove.position;
2793        out_status_p->count = jd->u.trove.count;
2794        dealloc_job_desc(jd);
2795        jd = NULL;
2796        return (ret);
2797    }
2798
2799    /* if we fall through to this point, the job did not
2800     * immediately complete and we must queue up to test later
2801     */
2802    *id = jd->job_id;
2803    trove_pending_count++;
2804
2805    return (0);
2806}
2807
2808/* job_trove_keyval_iterate_keys()
2809 *
2810 * iterate through all of the keys for a data space
2811 *
2812 * returns 0 on success, 1 on immediate completion, and -errno on
2813 * failure
2814 */
2815int job_trove_keyval_iterate_keys(PVFS_fs_id coll_id,
2816                             PVFS_handle handle,
2817                             PVFS_ds_position position,
2818                             PVFS_ds_keyval * key_array,
2819                             int count,
2820                             PVFS_ds_flags flags,
2821                             PVFS_vtag * vtag,
2822                             void *user_ptr,
2823                             job_aint status_user_tag,
2824                             job_status_s * out_status_p,
2825                             job_id_t * id,
2826                             job_context_id context_id,
2827                             PVFS_hint hints)
2828{
2829    /* post a trove keyval iterate_keys.  If it completes (or fails)
2830     * immediately, then return and fill in the status structure.
2831     * If it needs to be tested for completion later, then queue
2832     * up a job_desc structure.  */
2833    int ret = -1;
2834    struct job_desc *jd = NULL;
2835    void* user_ptr_internal;
2836
2837    /* create the job desc first, even though we may not use it.  This
2838     * gives us somewhere to store the BMI id and user ptr
2839     */
2840    jd = alloc_job_desc(JOB_TROVE);
2841    if (!jd)
2842    {
2843        out_status_p->error_code = -PVFS_ENOMEM;
2844        return 1;
2845    }
2846    jd->hints = hints;
2847    jd->job_user_ptr = user_ptr;
2848    jd->u.trove.vtag = vtag;
2849    jd->u.trove.position = position;
2850    jd->u.trove.count = count;
2851    jd->context_id = context_id;
2852    jd->status_user_tag = status_user_tag;
2853    jd->trove_callback.fn = trove_thread_mgr_callback;
2854    jd->trove_callback.data = (void*)jd;
2855    user_ptr_internal = &jd->trove_callback;
2856
2857#ifdef __PVFS2_TROVE_SUPPORT__
2858    ret = trove_keyval_iterate_keys(coll_id, handle,
2859                               &(jd->u.trove.position), key_array,
2860                               &(jd->u.trove.count), flags, jd->u.trove.vtag,
2861                               user_ptr_internal,
2862                               global_trove_context, &(jd->u.trove.id), hints);
2863#else
2864    gossip_err("Error: Trove support not enabled.\n");
2865    ret = -ENOSYS;
2866#endif
2867
2868    if (ret < 0)
2869    {
2870        /* error posting trove operation */
2871        dealloc_job_desc(jd);
2872        jd = NULL;
2873        out_status_p->error_code = ret;
2874        out_status_p->status_user_tag = status_user_tag;
2875        return (1);
2876    }
2877
2878    if (ret == 1)
2879    {
2880        /* immediate completion */
2881        out_status_p->error_code = 0;
2882        out_status_p->status_user_tag = status_user_tag;
2883        out_status_p->vtag = jd->u.trove.vtag;
2884        out_status_p->position = jd->u.trove.position;
2885        out_status_p->count = jd->u.trove.count;
2886        dealloc_job_desc(jd);
2887        jd = NULL;
2888        return (ret);
2889    }
2890
2891    /* if we fall through to this point, the job did not
2892     * immediately complete and we must queue up to test later
2893     */
2894    *id = jd->job_id;
2895    trove_pending_count++;
2896
2897    return (0);
2898}
2899
2900/* job_trove_dspace_iterate_handles()
2901 *
2902 * iterates through all of the handles in a given collection
2903 *
2904 * returns 0 on success, 1 on immediate completion, -PVFS_error on failure
2905 */
2906int job_trove_dspace_iterate_handles(PVFS_fs_id coll_id,
2907    PVFS_ds_position position,
2908    PVFS_handle* handle_array,
2909    int count,
2910    PVFS_ds_flags flags,
2911    PVFS_vtag* vtag,
2912    void* user_ptr,
2913    job_aint status_user_tag,
2914    job_status_s* out_status_p,
2915    job_id_t* id,
2916    job_context_id context_id)
2917{
2918    /* post a trove keyval iterate_handles.  If it completes (or fails)
2919     * immediately, then return and fill in the status structure.
2920     * If it needs to be tested for completion later, then queue
2921     * up a job_desc structure.  */
2922    int ret = -1;
2923    struct job_desc *jd = NULL;
2924    void* user_ptr_internal;
2925
2926    /* create the job desc first, even though we may not use it.  This
2927     * gives us somewhere to store the BMI id and user ptr
2928     */
2929    jd = alloc_job_desc(JOB_TROVE);
2930    if (!jd)
2931    {
2932        out_status_p->error_code = -PVFS_ENOMEM;
2933        return 1;
2934    }
2935    jd->job_user_ptr = user_ptr;
2936    jd->u.trove.vtag = vtag;
2937    jd->u.trove.position = position;
2938    jd->u.trove.count = count;
2939    jd->context_id = context_id;
2940    jd->status_user_tag = status_user_tag;
2941    jd->trove_callback.fn = trove_thread_mgr_callback;
2942    jd->trove_callback.data = (void*)jd;
2943    user_ptr_internal = &jd->trove_callback;
2944
2945#ifdef __PVFS2_TROVE_SUPPORT__
2946    ret = trove_dspace_iterate_handles(coll_id,
2947                               &(jd->u.trove.position), handle_array,
2948                               &(jd->u.trove.count), flags, jd->u.trove.vtag,
2949                               user_ptr_internal,
2950                               global_trove_context, &(jd->u.trove.id));
2951#else
2952    gossip_err("Error: Trove support not enabled.\n");
2953    ret = -ENOSYS;
2954#endif
2955
2956    if (ret < 0)
2957    {
2958        /* error posting trove operation */
2959        dealloc_job_desc(jd);
2960        jd = NULL;
2961        out_status_p->error_code = ret;
2962        out_status_p->status_user_tag = status_user_tag;
2963        return (1);
2964    }
2965
2966    if (ret == 1)
2967    {
2968        /* immediate completion */
2969        out_status_p->error_code = 0;
2970        out_status_p->status_user_tag = status_user_tag;
2971        out_status_p->vtag = jd->u.trove.vtag;
2972        out_status_p->position = jd->u.trove.position;
2973        out_status_p->count = jd->u.trove.count;
2974        dealloc_job_desc(jd);
2975        jd = NULL;
2976        return (ret);
2977    }
2978
2979    /* if we fall through to this point, the job did not
2980     * immediately complete and we must queue up to test later
2981     */
2982    *id = jd->job_id;
2983    trove_pending_count++;
2984
2985    return (0);
2986}
2987
2988
2989/* job_trove_dspace_create()
2990 *
2991 * create a new data space object
2992 *
2993 * returns 0 on success, 1 on immediate completion, and -errno on
2994 * failure
2995 */
2996int job_trove_dspace_create(PVFS_fs_id coll_id,
2997                            PVFS_handle_extent_array *handle_extent_array,
2998                            PVFS_ds_type type,
2999                            void *hint,
3000                            PVFS_ds_flags flags,
3001                            void *user_ptr,
3002                            job_aint status_user_tag,
3003                            job_status_s * out_status_p,
3004                            job_id_t * id,
3005                            job_context_id context_id,
3006                            PVFS_hint hints)
3007{
3008    /* post a dspace create.  If it completes (or fails) immediately, then
3009     * return and fill in the status structure.  If it needs to be tested
3010     * for completion later, then queue up a job_desc structure.
3011     */
3012    int ret = -1;
3013    struct job_desc *jd = NULL;
3014    void* user_ptr_internal;
3015
3016    /* create the job desc first, even though we may not use it.  This
3017     * gives us somewhere to store the BMI id and user ptr
3018     */
3019    jd = alloc_job_desc(JOB_TROVE);
3020    if (!jd)
3021    {
3022        out_status_p->error_code = -PVFS_ENOMEM;
3023        return 1;
3024    }
3025    jd->hints = hints;
3026    jd->job_user_ptr = user_ptr;
3027    jd->u.trove.handle = PVFS_HANDLE_NULL;
3028    jd->context_id = context_id;
3029    jd->status_user_tag = status_user_tag;
3030    jd->trove_callback.fn = trove_thread_mgr_callback;
3031    jd->trove_callback.data = (void*)jd;
3032    user_ptr_internal = &jd->trove_callback;
3033
3034
3035
3036#ifdef __PVFS2_TROVE_SUPPORT__
3037    ret = trove_dspace_create(coll_id,
3038                              handle_extent_array,
3039                              &(jd->u.trove.handle),
3040                              type,
3041                              hint, flags,
3042                              user_ptr_internal,
3043                              global_trove_context, &(jd->u.trove.id), hints);
3044#else
3045    gossip_err("Error: Trove support not enabled.\n");
3046    ret = -ENOSYS;
3047#endif
3048
3049    if (ret < 0)
3050    {
3051        /* error posting trove operation */
3052        dealloc_job_desc(jd);
3053        jd = NULL;
3054        out_status_p->error_code = ret;
3055        out_status_p->status_user_tag = status_user_tag;
3056        return (1);
3057    }
3058
3059    if (ret == 1)
3060    {
3061        /* immediate completion */
3062        out_status_p->error_code = 0;
3063        out_status_p->status_user_tag = status_user_tag;
3064        out_status_p->handle = jd->u.trove.handle;
3065        dealloc_job_desc(jd);
3066        jd = NULL;
3067        return (ret);
3068    }
3069
3070    /* if we fall through to this point, the job did not
3071     * immediately complete and we must queue up to test later
3072     */
3073    *id = jd->job_id;
3074    trove_pending_count++;
3075
3076    return (0);
3077}
3078
3079/* job_trove_dspace_create_list()
3080 *
3081 * create a new data space object
3082 *
3083 * returns 0 on success, 1 on immediate completion, and -errno on
3084 * failure
3085 */
3086int job_trove_dspace_create_list(PVFS_fs_id coll_id,
3087                            PVFS_handle_extent_array *handle_extent_array,
3088                            PVFS_handle* out_handle_array,
3089                            int count,
3090                            PVFS_ds_type type,
3091                            void *hint,
3092                            PVFS_ds_flags flags,
3093                            void *user_ptr,
3094                            job_aint status_user_tag,
3095                            job_status_s * out_status_p,
3096                            job_id_t * id,
3097                            job_context_id context_id,
3098                            PVFS_hint hints)
3099{
3100    /* post a dspace create list.  If it completes (or fails) immediately, then
3101     * return and fill in the status structure.  If it needs to be tested
3102     * for completion later, then queue up a job_desc structure.
3103     */
3104    int ret = -1;
3105    struct job_desc *jd = NULL;
3106    void* user_ptr_internal;
3107
3108    /* create the job desc first, even though we may not use it.  This
3109     * gives us somewhere to store the BMI id and user ptr
3110     */
3111    jd = alloc_job_desc(JOB_TROVE);
3112    if (!jd)
3113    {
3114        out_status_p->error_code = -PVFS_ENOMEM;
3115        return 1;
3116    }
3117    jd->job_user_ptr = user_ptr;
3118    jd->u.trove.handle = PVFS_HANDLE_NULL;
3119    jd->context_id = context_id;
3120    jd->status_user_tag = status_user_tag;
3121    jd->trove_callback.fn = trove_thread_mgr_callback;
3122    jd->trove_callback.data = (void*)jd;
3123    user_ptr_internal = &jd->trove_callback;
3124
3125#ifdef __PVFS2_TROVE_SUPPORT__
3126    ret = trove_dspace_create_list(coll_id,
3127                              handle_extent_array,
3128                              out_handle_array,
3129                              count,
3130                              type,
3131                              hint, flags,
3132                              user_ptr_internal,
3133                              global_trove_context, &(jd->u.trove.id),
3134                              hints);
3135#else
3136    gossip_err("Error: Trove support not enabled.\n");
3137    ret = -ENOSYS;
3138#endif
3139
3140    if (ret < 0)
3141    {
3142        /* error posting trove operation */
3143        dealloc_job_desc(jd);
3144        jd = NULL;
3145        out_status_p->error_code = ret;
3146        out_status_p->status_user_tag = status_user_tag;
3147        return (1);
3148    }
3149
3150    if (ret == 1)
3151    {
3152        /* immediate completion */
3153        out_status_p->error_code = 0;
3154        out_status_p->status_user_tag = status_user_tag;
3155        dealloc_job_desc(jd);
3156        jd = NULL;
3157        return (ret);
3158    }
3159
3160    /* if we fall through to this point, the job did not
3161     * immediately complete and we must queue up to test later
3162     */
3163    *id = jd->job_id;
3164    trove_pending_count++;
3165
3166    return (0);
3167}
3168
3169/* job_trove_dspace_remove_list()
3170 *
3171 * remove a list of data space objects (byte stream and key/value)
3172 *
3173 * returns 0 on success, 1 on immediate completion, and -errno on
3174 * failure
3175 */
3176int job_trove_dspace_remove_list(PVFS_fs_id coll_id,
3177                            PVFS_handle* handle_array,
3178                            PVFS_error *out_error_array,
3179                            int count,
3180                            PVFS_ds_flags flags,
3181                            void *user_ptr,
3182                            job_aint status_user_tag,
3183                            job_status_s * out_status_p,
3184                            job_id_t * id,
3185                            job_context_id context_id,
3186                            PVFS_hint hints)
3187{
3188    /* post a dspace remove_list.  If it completes (or fails) immediately, then
3189     * return and fill in the status structure.  If it needs to be tested
3190     * for completion later, then queue up a job_desc structure.
3191     */
3192    int ret = -1;
3193    struct job_desc *jd = NULL;
3194    void* user_ptr_internal;
3195
3196    /* create the job desc first, even though we may not use it.  This
3197     * gives us somewhere to store the BMI id and user ptr
3198     */
3199    jd = alloc_job_desc(JOB_TROVE);
3200    if (!jd)
3201    {
3202        out_status_p->error_code = -PVFS_ENOMEM;
3203        return 1;
3204    }
3205    jd->job_user_ptr = user_ptr;
3206    jd->context_id = context_id;
3207    jd->status_user_tag = status_user_tag;
3208    jd->trove_callback.fn = trove_thread_mgr_callback;
3209    jd->trove_callback.data = (void*)jd;
3210    user_ptr_internal = &jd->trove_callback;
3211
3212#ifdef __PVFS2_TROVE_SUPPORT__
3213    ret = trove_dspace_remove_list(coll_id,
3214                              handle_array,
3215                              out_error_array,
3216                              count,
3217                              flags,
3218                              user_ptr_internal,
3219                              global_trove_context, &(jd->u.trove.id),
3220                              hints);
3221#else
3222    gossip_err("Error: Trove support not enabled.\n");
3223    ret = -ENOSYS;
3224#endif
3225
3226    if (ret < 0)
3227    {
3228        /* error posting trove operation */
3229        dealloc_job_desc(jd);
3230        jd = NULL;
3231        out_status_p->error_code = ret;
3232        out_status_p->status_user_tag = status_user_tag;
3233        return (1);
3234    }
3235
3236    if (ret == 1)
3237    {
3238        /* immediate completion */
3239        out_status_p->error_code = 0;
3240        out_status_p->status_user_tag = status_user_tag;
3241        dealloc_job_desc(jd);
3242        jd = NULL;
3243        return (ret);
3244    }
3245
3246    /* if we fall through to this point, the job did not
3247     * immediately complete and we must queue up to test later
3248     */
3249    *id = jd->job_id;
3250    trove_pending_count++;
3251
3252    return (0);
3253}
3254
3255
3256
3257/* job_trove_dspace_remove()
3258 *
3259 * remove an entire data space object (byte stream and key/value)
3260 *
3261 * returns 0 on success, 1 on immediate completion, and -errno on
3262 * failure
3263 */
3264int job_trove_dspace_remove(PVFS_fs_id coll_id,
3265                            PVFS_handle handle,
3266                            PVFS_ds_flags flags,
3267                            void *user_ptr,
3268                            job_aint status_user_tag,
3269                            job_status_s * out_status_p,
3270                            job_id_t * id,
3271                            job_context_id context_id,
3272                            PVFS_hint hints)
3273{
3274    /* post a dspace remove.  If it completes (or fails) immediately, then
3275     * return and fill in the status structure.  If it needs to be tested
3276     * for completion later, then queue up a job_desc structure.
3277     */
3278    int ret = -1;
3279    struct job_desc *jd = NULL;
3280    void* user_ptr_internal;
3281
3282    /* create the job desc first, even though we may not use it.  This
3283     * gives us somewhere to store the BMI id and user ptr
3284     */
3285    jd = alloc_job_desc(JOB_TROVE);
3286    if (!jd)
3287    {
3288        out_status_p->error_code = -PVFS_ENOMEM;
3289        return 1;
3290    }
3291    jd->job_user_ptr = user_ptr;
3292    jd->context_id = context_id;
3293    jd->status_user_tag = status_user_tag;
3294    jd->trove_callback.fn = trove_thread_mgr_callback;
3295    jd->trove_callback.data = (void*)jd;
3296    user_ptr_internal = &jd->trove_callback;
3297
3298
3299
3300#ifdef __PVFS2_TROVE_SUPPORT__
3301    ret = trove_dspace_remove(coll_id,
3302                              handle, flags,
3303                              user_ptr_internal,
3304                              global_trove_context, &(jd->u.trove.id), hints);
3305#else
3306    gossip_err("Error: Trove support not enabled.\n");
3307    ret = -ENOSYS;
3308#endif
3309
3310    if (ret < 0)
3311    {
3312        /* error posting trove operation */
3313        dealloc_job_desc(jd);
3314        jd = NULL;
3315        out_status_p->error_code = ret;
3316        out_status_p->status_user_tag = status_user_tag;
3317        return (1);
3318    }
3319
3320    if (ret == 1)
3321    {
3322        /* immediate completion */
3323        out_status_p->error_code = 0;
3324        out_status_p->status_user_tag = status_user_tag;
3325        dealloc_job_desc(jd);
3326        jd = NULL;
3327        return (ret);
3328    }
3329
3330    /* if we fall through to this point, the job did not
3331     * immediately complete and we must queue up to test later
3332     */
3333    *id = jd->job_id;
3334    trove_pending_count++;
3335
3336    return (0);
3337}
3338
3339/* job_trove_dspace_verify()
3340 *
3341 * verify that a given dataspace exists and discover its type
3342 *
3343 * returns 0 on success, 1 on immediate completion, and -errno on
3344 * failure
3345 */
3346int job_trove_dspace_verify(PVFS_fs_id coll_id,
3347                            PVFS_handle handle,
3348                            PVFS_ds_flags flags,
3349                            void *user_ptr,
3350                            job_aint status_user_tag,
3351                            job_status_s * out_status_p,
3352                            job_id_t * id,
3353                            job_context_id context_id,
3354                            PVFS_hint hints)
3355{
3356    /* post a dspace verify.  If it completes (or fails) immediately, then
3357     * return and fill in the status structure.  If it needs to be tested
3358     * for completion later, then queue up a job_desc structure.
3359     */
3360    int ret = -1;
3361    struct job_desc *jd = NULL;
3362    void* user_ptr_internal;
3363
3364    /* create the job desc first, even though we may not use it.  This
3365     * gives us somewhere to store the BMI id and user ptr
3366     */
3367    jd = alloc_job_desc(JOB_TROVE);
3368    if (!jd)
3369    {
3370        out_status_p->error_code = -PVFS_ENOMEM;
3371        return 1;
3372    }
3373    jd->job_user_ptr = user_ptr;
3374    jd->context_id = context_id;
3375    jd->status_user_tag = status_user_tag;
3376    jd->trove_callback.fn = trove_thread_mgr_callback;
3377    jd->trove_callback.data = (void*)jd;
3378    user_ptr_internal = &jd->trove_callback;
3379
3380
3381
3382#ifdef __PVFS2_TROVE_SUPPORT__
3383    ret = trove_dspace_verify(coll_id,
3384                              handle, &jd->u.trove.type,
3385                              flags,
3386                              user_ptr_internal, global_trove_context, &(jd->u.trove.id), hints);
3387#else
3388    gossip_err("Error: Trove support not enabled.\n");
3389    ret = -ENOSYS;
3390#endif
3391
3392    if (ret < 0)
3393    {
3394        /* error posting trove operation */
3395        dealloc_job_desc(jd);
3396        jd = NULL;
3397        /* the trove_method will determine what value is returned in immediate
3398         * completion case */
3399        out_status_p->error_code = ret;
3400        out_status_p->status_user_tag = status_user_tag;
3401        return (1);
3402    }
3403
3404    if (ret == 1)
3405    {
3406        /* immediate completion */
3407        out_status_p->error_code = 0;
3408        out_status_p->status_user_tag = status_user_tag;
3409        dealloc_job_desc(jd);
3410        jd = NULL;
3411        return (ret);
3412    }
3413
3414    /* if we fall through to this point, the job did not
3415     * immediately complete and we must queue up to test later
3416     */
3417    *id = jd->job_id;
3418    trove_pending_count++;
3419
3420    return (0);
3421}
3422
3423/* job_trove_dspace_cancel()
3424 *
3425 * used to cancel a trove dspace operation in progress
3426 *
3427 * returns 0 on success, 1 on immediate completion, and -errno on
3428 * failure
3429 */
3430int job_trove_dspace_cancel(PVFS_fs_id coll_id,
3431                            job_id_t id,
3432                            job_context_id context_id)
3433{
3434    struct job_desc* query = NULL;
3435    int ret = -1;
3436
3437    gen_mutex_lock(&completion_mutex);
3438
3439    query = id_gen_safe_lookup(id);
3440    if (!query || query->completed_flag)
3441    {
3442        /* job has already completed, no cancellation needed */
3443        gen_mutex_unlock(&completion_mutex);
3444        return(0);
3445    }
3446
3447    /* tell thread mgr to cancel operation.  This will result in normal
3448     * completion path through thread mgr callbacks; no more work to do here */
3449    ret = PINT_thread_mgr_trove_cancel(
3450        query->u.trove.id, coll_id, &(query->trove_callback));
3451
3452    gen_mutex_unlock(&completion_mutex);
3453
3454    return(ret);
3455}
3456
3457
3458/* job_trove_fs_create()
3459 *
3460 * create a new file system
3461 *
3462 * returns 0 on success, 1 on immediate completion, and -errno on
3463 * failure
3464 */
3465int job_trove_fs_create(char *collname,
3466                        PVFS_fs_id new_coll_id,
3467                        void *user_ptr,
3468                        job_aint status_user_tag,
3469                        job_status_s * out_status_p,
3470                        job_id_t * id,
3471                        job_context_id context_id)
3472{
3473    /* post an fs create.  If it completes (or fails) immediately, then
3474     * return and fill in the status structure.  If it needs to be tested
3475     * for completion later, then queue up a job_desc structure.
3476     */
3477    int ret = -1;
3478    struct job_desc *jd = NULL;
3479    void* user_ptr_internal;
3480
3481    /* create the job desc first, even though we may not use it.  This
3482     * gives us somewhere to store the BMI id and user ptr
3483     */
3484    jd = alloc_job_desc(JOB_TROVE);
3485    if (!jd)
3486    {
3487        out_status_p->error_code = -PVFS_ENOMEM;
3488        return 1;
3489    }
3490    jd->job_user_ptr = user_ptr;
3491    jd->context_id = context_id;
3492    jd->status_user_tag = status_user_tag;
3493    jd->trove_callback.fn = trove_thread_mgr_callback;
3494    jd->trove_callback.data = (void*)jd;
3495    user_ptr_internal = &jd->trove_callback;
3496
3497#ifdef __PVFS2_TROVE_SUPPORT__
3498    ret = trove_collection_create(collname, new_coll_id, user_ptr_internal,
3499        &(jd->u.trove.id));
3500#else
3501    gossip_err("Error: Trove support not enabled.\n");
3502    ret = -ENOSYS;
3503#endif
3504
3505    if (ret < 0)
3506    {
3507        /* error posting trove operation */
3508        dealloc_job_desc(jd);
3509        jd = NULL;
3510        out_status_p->error_code = ret;
3511        out_status_p->status_user_tag = status_user_tag;
3512        return (1);
3513    }
3514
3515    if (ret == 1)
3516    {
3517        /* immediate completion */
3518        out_status_p->error_code = 0;
3519        out_status_p->status_user_tag = status_user_tag;
3520        dealloc_job_desc(jd);
3521        jd = NULL;
3522        return (ret);
3523    }
3524
3525    /* if we fall through to this point, the job did not
3526     * immediately complete and we must queue up to test later
3527     */
3528    *id = jd->job_id;
3529    trove_pending_count++;
3530
3531    return (0);
3532}
3533
3534/* job_trove_fs_remove()
3535 *
3536 * remove an existing file system
3537 *
3538 * returns 0 on success, 1 on immediate completion, and -errno on
3539 * failure
3540 */
3541int job_trove_fs_remove(char *collname,
3542                        void *user_ptr,
3543                        job_aint status_user_tag,
3544                        job_status_s * out_status_p,
3545                        job_id_t * id,
3546                        job_context_id context_id)
3547{
3548    gossip_lerr("Error: unimplemented.\n");
3549    out_status_p->error_code = -PVFS_ENOSYS;
3550    return 1;
3551}
3552
3553/* job_trove_fs_lookup()
3554 *
3555 * lookup a file system based on a string name
3556 *
3557 * returns 0 on success, 1 on immediate completion, and -errno on
3558 * failure
3559 */
3560int job_trove_fs_lookup(char *collname,
3561                        void *user_ptr,
3562                        job_aint status_user_tag,
3563                        job_status_s * out_status_p,
3564                        job_id_t * id,
3565                        job_context_id context_id)
3566{
3567    /* post a collection lookup.  If it completes (or fails) immediately, then
3568     * return and fill in the status structure.  If it needs to be tested
3569     * for completion later, then queue up a job_desc structure.
3570     */
3571    int ret = -1;
3572    struct job_desc *jd = NULL;
3573    void* user_ptr_internal;
3574
3575    /* create the job desc first, even though we may not use it.  This
3576     * gives us somewhere to store the BMI id and user ptr
3577     */
3578    jd = alloc_job_desc(JOB_TROVE);
3579    if (!jd)
3580    {
3581        out_status_p->error_code = -PVFS_ENOMEM;
3582        return 1;
3583    }
3584    jd->job_user_ptr = user_ptr;
3585    jd->context_id = context_id;
3586    jd->status_user_tag = status_user_tag;
3587    jd->trove_callback.fn = trove_thread_mgr_callback;
3588    jd->trove_callback.data = (void*)jd;
3589    user_ptr_internal = &jd->trove_callback;
3590
3591#ifdef __PVFS2_TROVE_SUPPORT__
3592    ret = trove_collection_lookup(
3593        TROVE_METHOD_DBPF,
3594        collname, &(jd->u.trove.fsid),
3595        user_ptr_internal, &(jd->u.trove.id));
3596#else
3597    gossip_err("Error: Trove support not enabled.\n");
3598    ret = -ENOSYS;
3599#endif
3600
3601    if (ret < 0)
3602    {
3603        /* error posting trove operation */
3604        dealloc_job_desc(jd);
3605        jd = NULL;
3606        out_status_p->error_code = ret;
3607        out_status_p->status_user_tag = status_user_tag;
3608        return (1);
3609    }
3610
3611    if (ret == 1)
3612    {
3613        /* immediate completion */
3614        out_status_p->error_code = 0;
3615        out_status_p->status_user_tag = status_user_tag;
3616        out_status_p->coll_id = jd->u.trove.fsid;
3617        dealloc_job_desc(jd);
3618        jd = NULL;
3619        return (ret);
3620    }
3621
3622    /* there is no way we can test on this if we don't know the coll_id */
3623    gossip_lerr("Error: trove_collection_lookup() returned 0 ???\n");
3624
3625    out_status_p->error_code = -PVFS_EINVAL;
3626    return 1;
3627}
3628
3629/* job_trove_fs_set_eattr()
3630 *
3631 * sets extended attributes for a file system
3632 *
3633 * returns 0 on success, 1 on immediate completion, and -errno on
3634 * failure
3635 */
3636int job_trove_fs_seteattr(PVFS_fs_id coll_id,
3637                          PVFS_ds_keyval * key_p,
3638                          PVFS_ds_keyval * val_p,
3639                          PVFS_ds_flags flags,
3640                          void *user_ptr,
3641                          job_aint status_user_tag,
3642                          job_status_s * out_status_p,
3643                          job_id_t * id,
3644                          job_context_id context_id,
3645                          PVFS_hint hints)
3646{
3647    /* post a trove collection set eattr.  If it completes (or fails)
3648     * immediately, then return and fill in the status structure.
3649     * If it needs to be tested for completion later, then queue
3650     * up a job_desc structure.  */
3651    int ret = -1;
3652    struct job_desc *jd = NULL;
3653    void* user_ptr_internal;
3654
3655    /* create the job desc first, even though we may not use it.  This
3656     * gives us somewhere to store the BMI id and user ptr
3657     */
3658    jd = alloc_job_desc(JOB_TROVE);
3659    if (!jd)
3660    {
3661        out_status_p->error_code = -PVFS_ENOMEM;
3662        return 1;
3663    }
3664    jd->hints = hints;
3665    jd->job_user_ptr = user_ptr;
3666    jd->context_id = context_id;
3667    jd->status_user_tag = status_user_tag;
3668    jd->trove_callback.fn = trove_thread_mgr_callback;
3669    jd->trove_callback.data = (void*)jd;
3670    user_ptr_internal = &jd->trove_callback;
3671
3672#ifdef __PVFS2_TROVE_SUPPORT__
3673    ret = trove_collection_seteattr(coll_id, key_p, val_p, flags,
3674                                    user_ptr_internal, global_trove_context,
3675                                    &(jd->u.trove.id));
3676#else
3677    gossip_err("Error: Trove support not enabled.\n");
3678    ret = -ENOSYS;
3679#endif
3680
3681    if (ret < 0)
3682    {
3683        /* error posting trove operation */
3684        dealloc_job_desc(jd);
3685        jd = NULL;
3686        out_status_p->error_code = ret;
3687        out_status_p->status_user_tag = status_user_tag;
3688        return (1);
3689    }
3690
3691    if (ret == 1)
3692    {
3693        /* immediate completion */
3694        out_status_p->error_code = 0;
3695        out_status_p->status_user_tag = status_user_tag;
3696        dealloc_job_desc(jd);
3697        jd = NULL;
3698        return (ret);
3699    }
3700
3701    /* if we fall through to this point, the job did not
3702     * immediately complete and we must queue up to test later
3703     */
3704    *id = jd->job_id;
3705    trove_pending_count++;
3706
3707    return (0);
3708}
3709
3710/* job_trove_fs_get_eattr()
3711 *
3712 * gets extended attributes for a file system
3713 *
3714 * returns 0 on success, 1 on immediate completion, and -errno on
3715 * failure
3716 */
3717int job_trove_fs_geteattr(PVFS_fs_id coll_id,
3718                          PVFS_ds_keyval * key_p,
3719                          PVFS_ds_keyval * val_p,
3720                          PVFS_ds_flags flags,
3721                          void *user_ptr,
3722                          job_aint status_user_tag,
3723                          job_status_s * out_status_p,
3724                          job_id_t * id,
3725                          job_context_id context_id,
3726                          PVFS_hint hints)
3727{
3728    /* post a trove collection get eattr.  If it completes (or fails)
3729     * immediately, then return and fill in the status structure.
3730     * If it needs to be tested for completion later, then queue
3731     * up a job_desc structure.  */
3732    int ret = -1;
3733    struct job_desc *jd = NULL;
3734    void* user_ptr_internal;
3735
3736    /* create the job desc first, even though we may not use it.  This
3737     * gives us somewhere to store the BMI id and user ptr
3738     */
3739    jd = alloc_job_desc(JOB_TROVE);
3740    if (!jd)
3741    {
3742        out_status_p->error_code = -PVFS_ENOMEM;
3743        return 1;
3744    }
3745    jd->job_user_ptr = user_ptr;
3746    jd->context_id = context_id;
3747    jd->status_user_tag = status_user_tag;
3748    jd->trove_callback.fn = trove_thread_mgr_callback;
3749    jd->trove_callback.data = (void*)jd;
3750    user_ptr_internal = &jd->trove_callback;
3751
3752#ifdef __PVFS2_TROVE_SUPPORT__
3753    ret = trove_collection_geteattr(coll_id, key_p, val_p, flags,
3754                                    user_ptr_internal, global_trove_context,
3755                                    &(jd->u.trove.id));
3756#else
3757    gossip_err("Error: Trove support not enabled.\n");
3758    ret = -ENOSYS;
3759#endif
3760
3761    if (ret < 0)
3762    {
3763        /* error posting trove operation */
3764        dealloc_job_desc(jd);
3765        jd = NULL;
3766        out_status_p->error_code = ret;
3767        out_status_p->status_user_tag = status_user_tag;
3768        return (1);
3769    }
3770
3771    if (ret == 1)
3772    {
3773        /* immediate completion */
3774        out_status_p->error_code = 0;
3775        out_status_p->status_user_tag = status_user_tag;
3776        dealloc_job_desc(jd);
3777        jd = NULL;
3778        return (ret);
3779    }
3780
3781    /* if we fall through to this point, the job did not
3782     * immediately complete and we must queue up to test later
3783     */
3784    *id = jd->job_id;
3785    trove_pending_count++;
3786
3787    return (0);
3788}
3789
3790/* job_null()
3791 *
3792 * post null job; can be used to trigger asynchronous state transitions
3793 * without doing any underlying work
3794 *
3795 * returns 0 on success, -PVFS_error on failure
3796 * NOTE: immediate completion not allowed here
3797 */
3798int job_null(
3799    int error_code,
3800    void *user_ptr,
3801    job_aint status_user_tag,
3802    job_status_s * out_status_p,
3803    job_id_t * id,
3804    job_context_id context_id)
3805{
3806    struct job_desc *jd = NULL;
3807
3808    jd = alloc_job_desc(JOB_NULL);
3809    if (!jd)
3810    {
3811        out_status_p->error_code = -PVFS_ENOMEM;
3812        return 1;
3813    }
3814    jd->job_user_ptr = user_ptr;
3815    jd->context_id = context_id;
3816    jd->status_user_tag = status_user_tag;
3817    jd->u.null_info.error_code = error_code;
3818
3819    gen_mutex_lock(&completion_mutex);
3820    job_desc_q_add(completion_queue_array[jd->context_id],
3821        jd);
3822    /* set completed flag while holding queue lock */
3823    jd->completed_flag = 1;
3824#ifdef __PVFS2_JOB_THREADED__
3825    /* wake up anyone waiting for completion */
3826    pthread_cond_signal(&completion_cond);
3827#endif
3828    gen_mutex_unlock(&completion_mutex);
3829
3830    return(0);
3831}
3832
3833
3834/* job_test()
3835 *
3836 * check for completion of a particular job, don't return until
3837 * either job completes or timeout expires
3838 *
3839 * returns 0 if nothing done, 1 if something done, -errno on failure
3840 */
3841int job_test(job_id_t id,
3842             int *out_count_p,
3843             void **returned_user_ptr_p,
3844             job_status_s * out_status_p,
3845             int timeout_ms,
3846             job_context_id context_id)
3847{
3848    int ret = -1;
3849    int tmp_index;
3850
3851    *out_count_p = 1;
3852
3853    /* job_test() is really just a special case of job_testsome() */
3854    ret = job_testsome(&id, out_count_p, &tmp_index, returned_user_ptr_p,
3855        out_status_p, timeout_ms, context_id);
3856
3857    return(ret);
3858}
3859
3860#ifdef __PVFS2_JOB_THREADED__
3861
3862/* job_testsome()
3863 *
3864 * check for completion of a set of jobs, don't return until
3865 * either all jobs complete or timeout expires
3866 *
3867 * returns 0 on success, -errno on failure
3868 */
3869int job_testsome(job_id_t * id_array,
3870                 int *inout_count_p,
3871                 int *out_index_array,
3872                 void **returned_user_ptr_array,
3873                 job_status_s * out_status_array_p,
3874                 int timeout_ms,
3875                 job_context_id context_id)
3876{
3877    int ret = -1;
3878    struct timespec pthread_timeout;
3879    struct timeval start;
3880    int original_count = *inout_count_p;
3881    int pthread_ret = -1;
3882
3883    /* use this as a chance to do a cheap test on the request
3884     * scheduler
3885     */
3886    if ((ret = do_one_test_cycle_req_sched()) < 0)
3887    {
3888        return (ret);
3889    }
3890
3891    /* figure out how long to wait if we need to */
3892    if(timeout_ms > 0)
3893    {
3894        ret = gettimeofday(&start, NULL);
3895        if (ret < 0)
3896        {
3897            return (ret);
3898        }
3899        pthread_timeout.tv_sec = start.tv_sec + timeout_ms / 1000;
3900        pthread_timeout.tv_nsec = (start.tv_usec + ((timeout_ms % 1000)*1000))*1000;
3901        if (pthread_timeout.tv_nsec > 1000000000)
3902        {
3903            pthread_timeout.tv_nsec = pthread_timeout.tv_nsec - 1000000000;
3904            pthread_timeout.tv_sec++;
3905        }
3906    }
3907
3908    /* check for completed jobs */
3909    gen_mutex_lock(&completion_mutex);
3910    pthread_ret = 0;
3911    while(((ret = completion_query_some(id_array,
3912        inout_count_p,
3913        out_index_array,
3914        returned_user_ptr_array,
3915        out_status_array_p)) == 0) &&
3916        ((pthread_ret == EINTR) || (pthread_ret == 0)))
3917    {
3918        *inout_count_p = original_count;
3919
3920        if(timeout_ms > 0)
3921        {
3922            pthread_ret = pthread_cond_timedwait(&completion_cond,
3923                &completion_mutex,
3924                &pthread_timeout);
3925        }
3926        else if(timeout_ms == 0)
3927        {
3928            pthread_ret = ETIMEDOUT;
3929        }
3930        else
3931        {
3932            /* block indefinitely */
3933            pthread_ret = pthread_cond_wait(&completion_cond,
3934                &completion_mutex);
3935        }
3936    }
3937    gen_mutex_unlock(&completion_mutex);
3938
3939    if(ret == 0)
3940    {
3941        *inout_count_p = 0;
3942
3943        if ((pthread_ret != 0) && (pthread_ret != EINTR) && (pthread_ret !=
3944            EINVAL) && pthread_ret != ETIMEDOUT)
3945        {
3946            /* pthread_cond_wait() gave a weird return code; pass along to
3947             * caller
3948             */
3949            ret = pthread_ret;
3950        }
3951    }
3952
3953    return(ret);
3954}
3955
3956#else /* __PVFS2_JOB_THREADED__ */
3957
3958/* job_testsome()
3959 *
3960 * check for completion of a set of jobs, don't return until
3961 * either all jobs complete or timeout expires
3962 *
3963 * returns 0 if nothing done, 1 if something done, -errno on failure
3964 */
3965int job_testsome(job_id_t * id_array,
3966                 int *inout_count_p,
3967                 int *out_index_array,
3968                 void **returned_user_ptr_array,
3969                 job_status_s * out_status_array_p,
3970                 int timeout_ms,
3971                 job_context_id context_id)
3972{
3973    int ret = -1;
3974    struct timeval target_time;
3975    struct timeval end;
3976    int original_count = *inout_count_p;
3977    int time_exhaust_flag = 0;
3978
3979    /* use this as a chance to do a cheap test on the request
3980     * scheduler
3981     */
3982    if ((ret = do_one_test_cycle_req_sched()) < 0)
3983    {
3984        return (ret);
3985    }
3986
3987    /* check before we do anything else to see if the completion queue
3988     * has anything in it
3989     */
3990    gen_mutex_lock(&completion_mutex);
3991    ret = completion_query_some(id_array,
3992                                 inout_count_p,
3993                                 out_index_array,
3994                                 returned_user_ptr_array,
3995                                 out_status_array_p);
3996    gen_mutex_unlock(&completion_mutex);
3997    /* return here on error or completion */
3998    if (ret < 0)
3999    {
4000        return (ret);
4001    }
4002    if (ret > 0)
4003    {
4004        return (1);
4005    }
4006
4007    *inout_count_p = original_count;
4008
4009    /* figure out when the function should time out */
4010    if(timeout_ms > 0)
4011    {
4012        ret = gettimeofday(&target_time, NULL);
4013        if (ret < 0)
4014        {
4015            return (ret);
4016        }
4017        target_time.tv_sec += (timeout_ms/1000);
4018        target_time.tv_usec += (timeout_ms%1000)*1000;
4019        if(target_time.tv_usec > 1000000)
4020        {
4021            target_time.tv_sec++;
4022            target_time.tv_usec -= 1000000;
4023        }
4024    }
4025
4026    /* if we fall through to this point, then we need to just try
4027     * to eat up the timeout until the jobs that we want hit the
4028     * completion queue
4029     */
4030    do
4031    {
4032
4033        if (timeout_ms)
4034        {
4035            do_one_work_cycle_all(10);
4036        }
4037        else
4038        {
4039            do_one_work_cycle_all(0);
4040        }
4041
4042        /* check queue now to see if anything is done */
4043        gen_mutex_lock(&completion_mutex);
4044        ret = completion_query_some(id_array,
4045                                     inout_count_p,
4046                                     out_index_array,
4047                                     returned_user_ptr_array,
4048                                     out_status_array_p);
4049        gen_mutex_unlock(&completion_mutex);
4050        /* return here on error or completion */
4051        if (ret < 0)
4052        {
4053            return (ret);
4054        }
4055        if (ret  > 0)
4056        {
4057            return (1);
4058        }
4059
4060        *inout_count_p = original_count;
4061
4062        /* if we reach this point, decide if we timeout or continue */
4063        if(timeout_ms == 0)
4064        {
4065            time_exhaust_flag = 1;
4066        }
4067        else if(timeout_ms < 0)
4068        {
4069            time_exhaust_flag = 0;
4070        }
4071        else
4072        {
4073            ret = gettimeofday(&end, NULL);
4074            if (ret < 0)
4075            {
4076                return (ret);
4077            }
4078
4079            /* compare current time with our projected timeout point */
4080            if((end.tv_sec > target_time.tv_sec) || ((end.tv_sec ==
4081                target_time.tv_sec) && (end.tv_usec >=
4082                target_time.tv_usec)))
4083            {
4084                time_exhaust_flag = 1;
4085            }
4086            else
4087            {
4088                time_exhaust_flag = 0;
4089            }
4090        }
4091
4092    } while (!time_exhaust_flag);
4093
4094    /* fall through, nothing done, time is used up */
4095    *inout_count_p = 0;
4096
4097    return (0);
4098}
4099#endif /* __PVFS2_JOB_THREADED__ */
4100
4101#ifdef __PVFS2_JOB_THREADED__
4102/* job_testcontext()
4103 *
4104 * check for completion of any jobs currently in progress.  Don't return
4105 * until either at least one job has completed or the timeout has
4106 * expired
4107 *
4108 * returns 0 on success, -errno on failure
4109 */
4110int job_testcontext(job_id_t * out_id_array_p,
4111                    int *inout_count_p,
4112                    void **returned_user_ptr_array,
4113                    job_status_s * out_status_array_p,
4114                    int timeout_ms,
4115                    job_context_id context_id)
4116{
4117    int ret = -1;
4118    struct timespec pthread_timeout;
4119    struct timeval start;
4120    int original_count = *inout_count_p;
4121    int pthread_ret = -1;
4122
4123    /* use this as a chance to do a cheap test on the request
4124     * scheduler
4125     */
4126    if ((ret = do_one_test_cycle_req_sched()) < 0)
4127    {
4128        return (ret);
4129    }
4130
4131    /* figure out how long to wait if we need to */
4132    if(timeout_ms > 0)
4133    {
4134        ret = gettimeofday(&start, NULL);
4135        if (ret < 0)
4136        {
4137            return (ret);
4138        }
4139        pthread_timeout.tv_sec = start.tv_sec + timeout_ms / 1000;
4140        pthread_timeout.tv_nsec = (start.tv_usec + ((timeout_ms % 1000)*1000))*1000;
4141        if (pthread_timeout.tv_nsec > 1000000000)
4142        {
4143            pthread_timeout.tv_nsec = pthread_timeout.tv_nsec - 1000000000;
4144            pthread_timeout.tv_sec++;
4145        }
4146    }
4147
4148    /* check for completed jobs */
4149    gen_mutex_lock(&completion_mutex);
4150    pthread_ret = 0;
4151    while(((ret = completion_query_context(out_id_array_p,
4152        inout_count_p,
4153        returned_user_ptr_array,
4154        out_status_array_p, context_id)) == 0) &&
4155        ((pthread_ret == EINTR) || (pthread_ret == 0)))
4156    {
4157        *inout_count_p = original_count;
4158
4159        if(timeout_ms > 0)
4160        {
4161            pthread_ret = pthread_cond_timedwait(&completion_cond,
4162                &completion_mutex,
4163                &pthread_timeout);
4164        }
4165        else if(timeout_ms == 0)
4166        {
4167            pthread_ret = ETIMEDOUT;
4168        }
4169        else
4170        {
4171            /* block indefinitely */
4172            pthread_ret = pthread_cond_wait(&completion_cond,
4173                &completion_mutex);
4174        }
4175    }
4176    gen_mutex_unlock(&completion_mutex);
4177
4178    if(ret == 0)
4179    {
4180        *inout_count_p = 0;
4181
4182        if ((pthread_ret != 0) && (pthread_ret != EINTR) && (pthread_ret !=
4183            EINVAL) && pthread_ret != ETIMEDOUT)
4184        {
4185            /* pthread_cond_wait() gave a weird return code; pass along to
4186             * caller
4187             */
4188            ret = pthread_ret;
4189        }
4190    }
4191
4192    return(ret);
4193}
4194
4195#else /* __PVFS2_JOB_THREADED__ */
4196
4197/* job_testcontext()
4198 *
4199 * check for completion of any jobs currently in progress.  Don't return
4200 * until either at least one job has completed or the timeout has
4201 * expired
4202 *
4203 * returns 0 on success, -errno on failure
4204 */
4205int job_testcontext(job_id_t * out_id_array_p,
4206                    int *inout_count_p,
4207                    void **returned_user_ptr_array,
4208                    job_status_s * out_status_array_p,
4209                    int timeout_ms,
4210                    job_context_id context_id)
4211{
4212    int ret = -1;
4213    struct timeval target_time;
4214    struct timeval end;
4215    int original_count = *inout_count_p;
4216    int time_exhaust_flag = 0;
4217
4218    /* use this as a chance to do a cheap test on the request
4219     * scheduler
4220     */
4221    if ((ret = do_one_test_cycle_req_sched()) < 0)
4222    {
4223        return (ret);
4224    }
4225
4226    /* check before we do anything else to see if the completion queue
4227     * has anything in it
4228     */
4229    gen_mutex_lock(&completion_mutex);
4230    ret = completion_query_context(out_id_array_p,
4231                                 inout_count_p,
4232                                 returned_user_ptr_array,
4233                                 out_status_array_p, context_id);
4234    gen_mutex_unlock(&completion_mutex);
4235    /* return here on error or completion */
4236    if (ret < 0)
4237    {
4238        return (ret);
4239    }
4240    if (ret > 0)
4241    {
4242        return (1);
4243    }
4244
4245    *inout_count_p = original_count;
4246
4247    /* figure out when the function should time out */
4248    if(timeout_ms > 0)
4249    {
4250        ret = gettimeofday(&target_time, NULL);
4251        if (ret < 0)
4252        {
4253            return (ret);
4254        }
4255        target_time.tv_sec += (timeout_ms/1000);
4256        target_time.tv_usec += (timeout_ms%1000)*1000;
4257        if(target_time.tv_usec > 1000000)
4258        {
4259            target_time.tv_sec++;
4260            target_time.tv_usec -= 1000000;
4261        }
4262    }
4263
4264    /* if we fall through to this point, then we need to just try
4265     * to eat up the timeout until the jobs that we want hit the
4266     * completion queue
4267     */
4268    do
4269    {
4270
4271        if (timeout_ms)
4272        {
4273            do_one_work_cycle_all(10);
4274        }
4275        else
4276        {
4277            do_one_work_cycle_all(0);
4278        }
4279
4280        /* check queue now to see if anything is done */
4281        gen_mutex_lock(&completion_mutex);
4282        ret = completion_query_context(out_id_array_p,
4283                                     inout_count_p,
4284                                     returned_user_ptr_array,
4285                                     out_status_array_p,
4286                                     context_id);
4287        gen_mutex_unlock(&completion_mutex);
4288        /* return here on error or completion */
4289        if (ret < 0)
4290        {
4291            return (ret);
4292        }
4293        if (ret  > 0)
4294        {
4295            return (1);
4296        }
4297
4298        *inout_count_p = original_count;
4299
4300        /* if we reach this point, decide if we timeout or continue */
4301        if(timeout_ms == 0)
4302        {
4303            time_exhaust_flag = 1;
4304        }
4305        else if(timeout_ms < 0)
4306        {
4307            time_exhaust_flag = 0;
4308        }
4309        else
4310        {
4311            ret = gettimeofday(&end, NULL);
4312            if (ret < 0)
4313            {
4314                return (ret);
4315            }
4316
4317            /* compare current time with our projected timeout point */
4318            if((end.tv_sec > target_time.tv_sec) || ((end.tv_sec ==
4319                target_time.tv_sec) && (end.tv_usec >=
4320                target_time.tv_usec)))
4321            {
4322                time_exhaust_flag = 1;
4323            }
4324            else
4325            {
4326                time_exhaust_flag = 0;
4327            }
4328        }
4329
4330    } while (!time_exhaust_flag);
4331
4332    /* fall through, nothing done, time is used up */
4333    *inout_count_p = 0;
4334
4335    return (0);
4336}
4337#endif /* __PVFS2_JOB_THREADED__ */
4338
4339
4340/*********************************************************
4341 * Internal utility functions
4342 */
4343
4344
4345/* setup_queues()
4346 *
4347 * initializes all of the job queues needed by the interface
4348 *
4349 * returns 0 on success, -errno on failure
4350 */
4351static int setup_queues(void)
4352{
4353
4354    gen_mutex_lock(&bmi_unexp_mutex);
4355    bmi_unexp_queue = job_desc_q_new();
4356    gen_mutex_unlock(&bmi_unexp_mutex);
4357
4358    gen_mutex_lock(&dev_unexp_mutex);
4359    dev_unexp_queue = job_desc_q_new();
4360    gen_mutex_unlock(&dev_unexp_mutex);
4361
4362    if (!bmi_unexp_queue || !dev_unexp_queue)
4363    {
4364        /* cleanup any that were initialized */
4365        teardown_queues();
4366        return (-ENOMEM);
4367    }
4368    return (0);
4369}
4370
4371/* teardown_queues()
4372 *
4373 * tears down any existing queues used by the job interface
4374 *
4375 * no return value
4376 */
4377static void teardown_queues(void)
4378{
4379
4380    gen_mutex_lock(&bmi_unexp_mutex);
4381    if (bmi_unexp_queue)
4382    {
4383        job_desc_q_cleanup(bmi_unexp_queue);
4384    }
4385    gen_mutex_unlock(&bmi_unexp_mutex);
4386
4387    gen_mutex_lock(&dev_unexp_mutex);
4388    if (dev_unexp_queue)
4389    {
4390        job_desc_q_cleanup(dev_unexp_queue);
4391    }
4392    gen_mutex_unlock(&dev_unexp_mutex);
4393
4394    return;
4395}
4396
4397#ifdef __PVFS2_TROVE_SUPPORT__
4398
4399/* precreate_pool_get_thread_mgr_callback_unlocked()
4400 *
4401 * callback function executed by the thread manager for precreate pool get
4402 * when a trove operation completes
4403 *
4404 * no return value
4405 */
4406static void precreate_pool_get_thread_mgr_callback_unlocked(
4407    void* data,
4408    PVFS_error error_code)
4409{
4410    struct precreate_pool_get_trove* tmp_trove = data;
4411   
4412    gen_mutex_lock(&initialized_mutex);
4413    if(initialized == 0)
4414    {
4415        /* The job interface has been shutdown.  Silently ignore callback. */
4416        gen_mutex_unlock(&initialized_mutex);
4417        return;
4418    }
4419    gen_mutex_unlock(&initialized_mutex);
4420
4421    if(error_code == 0)
4422    {
4423        gossip_debug(GOSSIP_JOB_DEBUG,
4424            "Got precreated handle: %llu\n",
4425            llu(*((PVFS_handle*)tmp_trove->key.buffer)));
4426    }
4427
4428    trove_pending_count--;
4429    assert(trove_pending_count >= 0);
4430
4431    tmp_trove->jd->u.precreate_pool.trove_pending--;
4432    assert(tmp_trove->jd->u.precreate_pool.trove_pending >= 0);
4433
4434    /* don't overwrite error codes from other trove ops */
4435    if(tmp_trove->jd->u.precreate_pool.error_code == 0)
4436    {
4437        tmp_trove->jd->u.precreate_pool.error_code = error_code;
4438    }
4439
4440    /* is this job done? */
4441    if(tmp_trove->jd->u.precreate_pool.trove_pending == 0)
4442    {
4443        gen_mutex_lock(&completion_mutex);
4444
4445        /* set job descriptor fields and put into completion queue */
4446        tmp_trove->jd->u.precreate_pool.error_code = 0;
4447        job_desc_q_add(completion_queue_array[tmp_trove->jd->context_id],
4448                       tmp_trove->jd);
4449        /* set completed flag while holding queue lock */
4450        tmp_trove->jd->completed_flag = 1;
4451
4452#ifdef __PVFS2_JOB_THREADED__
4453        /* wake up anyone waiting for completion */
4454        pthread_cond_signal(&completion_cond);
4455#endif
4456        free(tmp_trove->jd->u.precreate_pool.data);
4457        gen_mutex_unlock(&completion_mutex);
4458        return;
4459    }
4460
4461    return;
4462}
4463
4464
4465/* precreate_pool_iterate_callback()
4466 *
4467 * callback function executed by the thread mgr when a trove iterate
4468 * completes
4469 *
4470 * no return value
4471 */
4472static void precreate_pool_iterate_callback(
4473    void* data,
4474    PVFS_error error_code)
4475{   
4476    struct job_desc* tmp_desc = (struct job_desc*)data;
4477
4478    gen_mutex_lock(&initialized_mutex);
4479    if(initialized == 0)
4480    {
4481        /* The job interface has been shutdown.  Silently ignore callback. */
4482        gen_mutex_unlock(&initialized_mutex);
4483        return;
4484    }
4485    gen_mutex_unlock(&initialized_mutex);
4486
4487    gen_mutex_lock(&completion_mutex);
4488    if (tmp_desc->completed_flag == 0)
4489    {
4490        /* set job descriptor fields and put into completion queue */
4491        tmp_desc->u.precreate_pool.error_code = error_code;
4492        free(tmp_desc->u.precreate_pool.key_array);
4493        job_desc_q_add(completion_queue_array[tmp_desc->context_id],
4494                       tmp_desc);
4495        /* set completed flag while holding queue lock */
4496        tmp_desc->completed_flag = 1;
4497
4498        trove_pending_count--;
4499
4500#ifdef __PVFS2_JOB_THREADED__
4501        /* wake up anyone waiting for completion */
4502        pthread_cond_signal(&completion_cond);
4503#endif
4504    }
4505    gen_mutex_unlock(&completion_mutex);
4506
4507    return;
4508}
4509
4510/* precreate_pool_get_thread_mgr_callback()
4511 *
4512 * callback function executed by the thread manager for precreate pool get
4513 * when a trove operation completes
4514 *
4515 * no return value
4516 */
4517static void precreate_pool_get_thread_mgr_callback(
4518    void* data,
4519    PVFS_error error_code)
4520{
4521    gen_mutex_lock(&precreate_pool_mutex);
4522    precreate_pool_get_thread_mgr_callback_unlocked(data, error_code);
4523    gen_mutex_unlock(&precreate_pool_mutex);
4524}
4525
4526/* precreate_pool_fill_thread_mgr_callback()
4527 *
4528 * callback function executed by the thread manager for precreate pool fill
4529 * when a trove operation completes
4530 *
4531 * no return value
4532 */
4533static void precreate_pool_fill_thread_mgr_callback(
4534    void* data,
4535    PVFS_error error_code)
4536{
4537    struct job_desc* jd = (struct job_desc*)data;
4538    struct job_desc* jd_checker;
4539    int ret;
4540    int count = 0;
4541    int i;
4542    struct qlist_head* iterator;
4543    struct qlist_head* scratch;
4544    struct precreate_pool* pool;
4545    int awoken_count = 0;
4546    QLIST_HEAD(tmp_list);
4547    job_id_t tmp_id;
4548    int extra_trove_flags = 0;
4549    struct fs_pool* fs;
4550
4551    assert(jd);
4552
4553    gen_mutex_lock(&initialized_mutex);
4554    if(initialized == 0)
4555    {
4556        /* The job interface has been shutdown.  Silently ignore callback. */
4557        gen_mutex_unlock(&initialized_mutex);
4558        return;
4559    }
4560    gen_mutex_unlock(&initialized_mutex);
4561
4562    if(error_code != 0)
4563    {
4564        gossip_err("Error: unable to write all precreated handles to pool.\n");
4565        gossip_err("Warning: fsck may be needed to recover stranded handles.\n");
4566        free(jd->u.precreate_pool.key_array);
4567        gen_mutex_lock(&completion_mutex);
4568
4569        /* set job descriptor fields and put into completion queue */
4570        jd->u.precreate_pool.error_code = error_code;
4571        job_desc_q_add(completion_queue_array[jd->context_id], jd);
4572        /* set completed flag while holding queue lock */
4573        jd->completed_flag = 1;
4574#ifdef __PVFS2_JOB_THREADED__
4575        /* wake up anyone waiting for completion */
4576        pthread_cond_signal(&completion_cond);
4577#endif
4578        gen_mutex_unlock(&completion_mutex);
4579        return;
4580    }
4581
4582    if(jd->u.precreate_pool.first_callback_flag == 1)
4583    {
4584        /* this is the first post */
4585        gossip_debug(GOSSIP_JOB_DEBUG, "precreate_pool_fill_thread_mgr_callback() first post.\n");
4586        jd->u.precreate_pool.first_callback_flag = 0;
4587    }
4588    else
4589    {
4590        gossip_debug(GOSSIP_JOB_DEBUG, "precreate_pool_fill_thread_mgr_callback() completed trove op.\n");
4591        /* a trove operation completed successfully */
4592        jd->u.precreate_pool.precreate_handle_index +=
4593            jd->u.precreate_pool.posted_count;
4594        trove_pending_count--;
4595
4596        /* increment in-memory count for this pool */
4597        gen_mutex_lock(&precreate_pool_mutex);
4598        fs = find_fs(jd->u.precreate_pool.fsid);
4599        assert(fs);
4600
4601        qlist_for_each(iterator, &fs->precreate_pool_list)
4602        {
4603            pool = qlist_entry(iterator, struct precreate_pool,
4604                list_link);
4605            if(pool->pool_handle == jd->u.precreate_pool.precreate_pool)
4606            {
4607                pool->pool_count += jd->u.precreate_pool.posted_count;
4608                gossip_debug(GOSSIP_JOB_DEBUG,
4609                    "Pool count for handle %llu incremented to %d\n",
4610                    llu(pool->pool_handle),
4611                    pool->pool_count);
4612                break;
4613            }
4614        }
4615
4616        /* find out if anyone was sleeping because a pool was empty */
4617        gossip_debug(GOSSIP_JOB_DEBUG, "checking for get_handles() sleepers\n");
4618        qlist_for_each_safe(iterator, scratch,
4619            &precreate_pool_get_handles_list)
4620        {
4621            jd_checker = qlist_entry(iterator, struct job_desc,
4622                job_desc_q_link);
4623
4624            awoken_count++;
4625            /* put them on a new local queue */
4626            qlist_del(&jd_checker->job_desc_q_link);
4627            qlist_add(&jd_checker->job_desc_q_link, &tmp_list);
4628            gossip_debug(GOSSIP_JOB_DEBUG, "Found someone waiting to get handles from precreate pool\n");
4629
4630            if(awoken_count == jd->u.precreate_pool.posted_count)
4631            {
4632                /* that's as many as we should wake up right now */
4633                break;
4634            }
4635        }
4636        gen_mutex_unlock(&precreate_pool_mutex);
4637
4638        /* now that we have collected the sleepers into our own private
4639         * queue, we can push them without the precreate_pool_mutex held
4640         */
4641        gossip_debug(GOSSIP_JOB_DEBUG, "About to push on get_handles() sleepers.\n");
4642        qlist_for_each_safe(iterator, scratch, &tmp_list)
4643        {
4644            jd_checker = qlist_entry(iterator, struct job_desc,
4645                job_desc_q_link);
4646            qlist_del(&jd_checker->job_desc_q_link);
4647            gossip_debug(GOSSIP_JOB_DEBUG, "Pushing get_handles() sleeper for jd: %p.\n", jd_checker);
4648            precreate_pool_get_handles_try_post(jd_checker);
4649        }
4650    }
4651
4652    /* are we done? */
4653    if(jd->u.precreate_pool.precreate_handle_index >=
4654        jd->u.precreate_pool.precreate_handle_count)
4655    {
4656        free(jd->u.precreate_pool.key_array);
4657        gen_mutex_lock(&completion_mutex);
4658
4659        /* set job descriptor fields and put into completion queue */
4660        jd->u.precreate_pool.error_code = 0;
4661        job_desc_q_add(completion_queue_array[jd->context_id],
4662                       jd);
4663        /* set completed flag while holding queue lock */
4664        jd->completed_flag = 1;
4665
4666#ifdef __PVFS2_JOB_THREADED__
4667        /* wake up anyone waiting for completion */
4668        pthread_cond_signal(&completion_cond);
4669#endif
4670        gen_mutex_unlock(&completion_mutex);
4671        return;
4672    }
4673
4674    /* fill in information for next keyval write */
4675    for(i=jd->u.precreate_pool.precreate_handle_index;
4676        (i < jd->u.precreate_pool.precreate_handle_count &&
4677        (i < (jd->u.precreate_pool.precreate_handle_index
4678           + PRECREATE_POOL_MAX_KEYS)));
4679        i++)
4680    {
4681        jd->u.precreate_pool.key_array[count].buffer =
4682            &jd->u.precreate_pool.precreate_handle_array[i];
4683        jd->u.precreate_pool.key_array[count].buffer_sz = sizeof(PVFS_handle);
4684        count++;
4685
4686        /* always leave the values zeroed out */
4687    }
4688
4689    jd->u.precreate_pool.posted_count = count;
4690
4691    if((jd->u.precreate_pool.posted_count
4692        + jd->u.precreate_pool.precreate_handle_index)
4693        >= jd->u.precreate_pool.precreate_handle_count)
4694    {
4695        /* this will be the last set written; sync db */
4696        extra_trove_flags |= TROVE_SYNC;
4697    }
4698
4699    gossip_debug(GOSSIP_JOB_DEBUG, "job_precreate_pool_fill() posting trove_keyval_write_list()\n");
4700    ret = trove_keyval_write_list(jd->u.precreate_pool.fsid,
4701                            jd->u.precreate_pool.precreate_pool,
4702                            jd->u.precreate_pool.key_array,
4703                            NULL,
4704                            count,
4705                            (TROVE_BINARY_KEY|TROVE_NOOVERWRITE|
4706                            TROVE_KEYVAL_HANDLE_COUNT|extra_trove_flags),
4707                            NULL,
4708                            &jd->trove_callback,
4709                            global_trove_context,
4710                            &tmp_id,
4711                            jd->hints);
4712   
4713    trove_pending_count++;
4714
4715    if(ret < 0)
4716    {
4717        gossip_err("Error: unable to write all precreated handles to pool.\n");
4718        gossip_err("Warning: fsck may be needed to recover stranded handles.\n");
4719        gen_mutex_lock(&completion_mutex);
4720
4721        /* set job descriptor fields and put into completion queue */
4722        jd->u.precreate_pool.error_code = ret;
4723        job_desc_q_add(completion_queue_array[jd->context_id], jd);
4724        /* set completed flag while holding queue lock */
4725        jd->completed_flag = 1;
4726#ifdef __PVFS2_JOB_THREADED__
4727        /* wake up anyone waiting for completion */
4728        pthread_cond_signal(&completion_cond);
4729#endif
4730        gen_mutex_unlock(&completion_mutex);
4731        return;
4732    }
4733    else if(ret == 1)
4734    {
4735        gossip_debug(GOSSIP_JOB_DEBUG, "trove_keyval_write_list() immediate completion\n");
4736        precreate_pool_fill_thread_mgr_callback(jd, 0);
4737    }
4738    else
4739    {
4740        gossip_debug(GOSSIP_JOB_DEBUG, "trove_keyval_write_list() returned zero\n");
4741    }
4742
4743    return;
4744}
4745#endif /* __PVFS2_TROVE_SUPPORT__ */
4746
4747
4748/* trove_thread_mgr_callback()
4749 *
4750 * callback function executed by the thread manager for Trove when a Trove
4751 * job completes
4752 *
4753 * no return value
4754 */
4755static void trove_thread_mgr_callback(
4756    void* data,
4757    PVFS_error error_code)
4758{
4759    struct job_desc* tmp_desc = (struct job_desc*)data;
4760    assert(tmp_desc);
4761
4762    gen_mutex_lock(&initialized_mutex);
4763    if(initialized == 0)
4764    {
4765        /* The job interface has been shutdown.  Silently ignore callback. */
4766        gen_mutex_unlock(&initialized_mutex);
4767        return;
4768    }
4769    gen_mutex_unlock(&initialized_mutex);
4770
4771    gen_mutex_lock(&completion_mutex);
4772    if (tmp_desc->completed_flag == 0)
4773    {
4774        /* set job descriptor fields and put into completion queue */
4775        tmp_desc->u.trove.state = error_code;
4776        job_desc_q_add(completion_queue_array[tmp_desc->context_id],
4777                       tmp_desc);
4778        /* set completed flag while holding queue lock */
4779        tmp_desc->completed_flag = 1;
4780
4781        trove_pending_count--;
4782
4783#ifdef __PVFS2_JOB_THREADED__
4784        /* wake up anyone waiting for completion */
4785        pthread_cond_signal(&completion_cond);
4786#endif
4787    }
4788    gen_mutex_unlock(&completion_mutex);
4789}
4790
4791/* bmi_thread_mgr_callback()
4792 *
4793 * callback function executed by the thread manager for BMI when a BMI
4794 * job completes
4795 *
4796 * no return value
4797 */
4798static void bmi_thread_mgr_callback(
4799    void* data,
4800    PVFS_size actual_size,
4801    PVFS_error error_code)
4802{
4803    struct job_desc* tmp_desc = (struct job_desc*)data;
4804    assert(tmp_desc);
4805
4806    gen_mutex_lock(&initialized_mutex);
4807    if(initialized == 0)
4808    {
4809        /* The job interface has been shutdown.  Silently ignore callback. */
4810        gen_mutex_unlock(&initialized_mutex);
4811        return;
4812    }
4813    gen_mutex_unlock(&initialized_mutex);
4814
4815    gen_mutex_lock(&completion_mutex);
4816    if (tmp_desc->completed_flag == 0)
4817    {
4818        /* set job descriptor fields and put into completion queue */
4819        tmp_desc->u.bmi.error_code = error_code;
4820        tmp_desc->u.bmi.actual_size = actual_size;
4821        job_desc_q_add(completion_queue_array[tmp_desc->context_id],
4822                       tmp_desc);
4823        /* set completed flag while holding queue lock */
4824        tmp_desc->completed_flag = 1;
4825
4826        bmi_pending_count--;
4827
4828#ifdef __PVFS2_JOB_THREADED__
4829        /* wake up anyone waiting for completion */
4830        pthread_cond_signal(&completion_cond);
4831#endif
4832    }
4833    gen_mutex_unlock(&completion_mutex);
4834}
4835
4836/* bmi_thread_mgr_unexp_handler()
4837 *
4838 * callback function executed by the thread manager for BMI when an unexpected
4839 * BMI message arrives
4840 *
4841 * no return value
4842 */
4843static void bmi_thread_mgr_unexp_handler(
4844    struct BMI_unexpected_info* unexp)
4845{
4846    struct job_desc* tmp_desc = NULL;
4847
4848    gen_mutex_lock(&initialized_mutex);
4849    if(initialized == 0)
4850    {
4851        /* The job interface has been shutdown.  Silently ignore callback. */
4852        gen_mutex_unlock(&initialized_mutex);
4853        return;
4854    }
4855    gen_mutex_unlock(&initialized_mutex);
4856
4857    gen_mutex_lock(&bmi_unexp_mutex);
4858    /* remove the operation from the pending bmi_unexp queue */
4859    tmp_desc = job_desc_q_shownext(bmi_unexp_queue);
4860    assert(tmp_desc != NULL);
4861    if (tmp_desc->completed_flag == 0)
4862    {
4863        job_desc_q_remove(tmp_desc);
4864        bmi_unexp_pending_count--;
4865        gen_mutex_unlock(&bmi_unexp_mutex);
4866        /* set appropriate fields and store in completed queue */
4867        *(tmp_desc->u.bmi_unexp.info) = *unexp;
4868        gen_mutex_lock(&completion_mutex);
4869        /* set completed flag while holding queue lock */
4870        tmp_desc->completed_flag = 1;
4871        if (completion_queue_array[tmp_desc->context_id])
4872        {
4873            job_desc_q_add(completion_queue_array[tmp_desc->context_id],
4874                           tmp_desc);
4875        }
4876
4877#ifdef __PVFS2_JOB_THREADED__
4878        /* wake up anyone waiting for completion */
4879        pthread_cond_signal(&completion_cond);
4880#endif
4881        gen_mutex_unlock(&completion_mutex);
4882    }
4883    else
4884    {
4885        gen_mutex_unlock(&bmi_unexp_mutex);
4886    }
4887}
4888
4889/* dev_thread_mgr_unexp_handler()
4890 *
4891 * callback function executed by the thread manager for dev when an unexpected
4892 * device message arrives
4893 *
4894 * no return value
4895 */
4896static void dev_thread_mgr_unexp_handler(struct PINT_dev_unexp_info* unexp)
4897{
4898    struct job_desc* tmp_desc = NULL;
4899
4900    gen_mutex_lock(&dev_unexp_mutex);
4901    /* remove the operation from the pending dev_unexp queue */
4902    tmp_desc = job_desc_q_shownext(dev_unexp_queue);
4903    /* if the thread mgr accounting is accurate, then there _must_ be a
4904     * dev_unexp job posted for us to hit this point.
4905     */
4906    assert(tmp_desc != NULL);
4907    if (tmp_desc->completed_flag == 0)
4908    {
4909        job_desc_q_remove(tmp_desc);
4910        dev_unexp_pending_count--;
4911        gen_mutex_unlock(&dev_unexp_mutex);
4912        /* set appropriate fields and store in completed queue */
4913        *(tmp_desc->u.dev_unexp.info) = *unexp;
4914        gen_mutex_lock(&completion_mutex);
4915        /* set completed flag while holding queue lock */
4916        tmp_desc->completed_flag = 1;
4917        if (completion_queue_array[tmp_desc->context_id])
4918        {
4919            job_desc_q_add(completion_queue_array[tmp_desc->context_id],
4920                           tmp_desc);
4921        }
4922
4923#ifdef __PVFS2_JOB_THREADED__
4924        /* wake up anyone waiting for completion */
4925        pthread_cond_signal(&completion_cond);
4926#endif
4927        gen_mutex_unlock(&completion_mutex);
4928    }
4929    else
4930    {
4931        gen_mutex_unlock(&dev_unexp_mutex);
4932    }
4933}
4934
4935/* fill_status()
4936 *
4937 * fills in the completion status based on the given job descriptor
4938 *
4939 * no return value
4940 */
4941static void fill_status(struct job_desc *jd,
4942                        void **returned_user_ptr_p,
4943                        job_status_s * status)
4944{
4945    assert(jd);
4946    assert(status);
4947
4948#if 0
4949    gossip_debug(GOSSIP_JOB_DEBUG,
4950        "job fill_status() for id: %llu, type: %d\n",
4951        llu(jd->job_id), jd->type);
4952#endif
4953
4954    status->status_user_tag = jd->status_user_tag;
4955
4956    if (returned_user_ptr_p)
4957    {
4958        *returned_user_ptr_p = jd->job_user_ptr;
4959    }
4960    switch (jd->type)
4961    {
4962    case JOB_BMI:
4963        job_time_mgr_rem(jd);
4964        status->error_code = jd->u.bmi.error_code;
4965        status->actual_size = jd->u.bmi.actual_size;
4966        break;
4967    case JOB_BMI_UNEXP:
4968        status->error_code = jd->u.bmi_unexp.info->error_code;
4969        status->actual_size = jd->u.bmi_unexp.info->size;
4970        break;
4971    case JOB_FLOW:
4972        job_time_mgr_rem(jd);
4973        status->error_code = jd->u.flow.flow_d->error_code;
4974        status->actual_size = jd->u.flow.flow_d->total_transferred;
4975        break;
4976    case JOB_REQ_SCHED:
4977        status->error_code = jd->u.req_sched.error_code;
4978        break;
4979    case JOB_TROVE:
4980        status->error_code = jd->u.trove.state;
4981        if(jd->u.trove.out_size_p)
4982            status->actual_size = *jd->u.trove.out_size_p;
4983        status->vtag = jd->u.trove.vtag;
4984        status->coll_id = jd->u.trove.fsid;
4985        status->handle = jd->u.trove.handle;
4986        status->position = jd->u.trove.position;
4987        status->count = jd->u.trove.count;
4988        status->type = jd->u.trove.type;
4989        break;
4990    case JOB_DEV_UNEXP:
4991        status->error_code = 0;
4992        status->actual_size = jd->u.dev_unexp.info->size;
4993        break;
4994    case JOB_REQ_SCHED_TIMER:
4995        status->error_code = jd->u.req_sched.error_code;
4996        break;
4997    case JOB_NULL:
4998        status->error_code = jd->u.null_info.error_code;
4999        break;
5000    case JOB_PRECREATE_POOL:
5001        status->error_code = jd->u.precreate_pool.error_code;
5002        status->count = jd->u.precreate_pool.count;
5003        status->position = jd->u.precreate_pool.pool_index << 32;
5004        status->position |= jd->u.precreate_pool.position;
5005        break;
5006    }
5007
5008    return;
5009}
5010
5011/* do_one_test_cycle_req_sched()
5012 *
5013 * tests the request scheduler to see if anything has completed.
5014 * Does not block at all.
5015 *
5016 * returns 0 on success, -errno on failure
5017 */
5018static int do_one_test_cycle_req_sched(void)
5019{
5020    int ret = -1;
5021    int count = job_work_metric;
5022    req_sched_id id_array[job_work_metric];
5023    void *user_ptr_array[job_work_metric];
5024    int error_code_array[job_work_metric];
5025    int i;
5026    struct job_desc *tmp_desc = NULL;
5027
5028
5029    ret = PINT_req_sched_testworld(&count, id_array,
5030                                   user_ptr_array, error_code_array);
5031
5032    if (ret < 0)
5033    {
5034        /* critical failure */
5035        /* TODO: can I clean up anything else here? */
5036        gossip_lerr("Error: critical request scheduler failure.\n");
5037        return (ret);
5038    }
5039
5040    for (i = 0; i < count; i++)
5041    {
5042        /* remove operation from queue */
5043        tmp_desc = (struct job_desc *) user_ptr_array[i];
5044        /* set appropriate fields and place in completed queue */
5045        tmp_desc->u.req_sched.error_code = error_code_array[i];
5046        gen_mutex_lock(&completion_mutex);
5047        /* set completed flag while holding queue lock */
5048        tmp_desc->completed_flag = 1;
5049        job_desc_q_add(completion_queue_array[tmp_desc->context_id],
5050            tmp_desc);
5051        gen_mutex_unlock(&completion_mutex);
5052    }
5053
5054    return (0);
5055}
5056
5057
5058/*
5059 * Appears to return <0 if problem, 0 if not done, 1 if done.
5060 */
5061static int completion_query_some(job_id_t * id_array,
5062                                 int *inout_count_p,
5063                                 int *out_index_array,
5064                                 void **returned_user_ptr_array,
5065                                 job_status_s * out_status_array_p)
5066{
5067    int i;
5068    struct job_desc *tmp_desc;
5069    int incount = *inout_count_p;
5070    int real_id_count = 0;
5071    int done_count = 0;
5072
5073    *inout_count_p = 0;
5074
5075    if (completion_error)
5076    {
5077        return (completion_error);
5078    }
5079
5080    /* count how many of the id's are non zero */
5081    for (i = 0; i < incount; i++)
5082    {
5083        if (id_array[i])
5084        {
5085            real_id_count++;
5086        }
5087    }
5088
5089    if (!real_id_count)
5090    {
5091        gossip_lerr("Error: job_testXXX() called with no valid ids.\n");
5092        return (-EINVAL);
5093    }
5094
5095    /* don't do anything unless all of the target ops are done */
5096    for(i=0; i<incount; i++)
5097    {
5098        tmp_desc = id_gen_safe_lookup(id_array[i]);
5099        if(tmp_desc && tmp_desc->completed_flag)
5100        {
5101            done_count++;
5102        }
5103    }
5104
5105    if(done_count < real_id_count)
5106    {
5107        return(0);
5108    }
5109
5110    /* all target ops are complete; pull them out of completion queue */
5111    for(i=0; i<incount; i++)
5112    {
5113        tmp_desc = id_gen_safe_lookup(id_array[i]);
5114        if(tmp_desc && tmp_desc->completed_flag)
5115        {
5116            if(returned_user_ptr_array)
5117            {
5118                fill_status(tmp_desc,
5119                    &(returned_user_ptr_array[*inout_count_p]),
5120                    &(out_status_array_p[*inout_count_p]));
5121            }
5122            else
5123            {
5124                fill_status(tmp_desc, NULL,
5125                    &(out_status_array_p[*inout_count_p]));
5126            }
5127            job_desc_q_remove(tmp_desc);
5128            if (tmp_desc->type == JOB_REQ_SCHED &&
5129                tmp_desc->u.req_sched.post_flag == 1)
5130            {
5131                /* special case; don't delete job desc for req sched
5132                 * jobs; we need to hang on to them for use when entry
5133                 * is released
5134                 */
5135            }
5136            else
5137            {
5138                dealloc_job_desc(tmp_desc);
5139                tmp_desc = NULL;
5140            }
5141            out_index_array[*inout_count_p] = i;
5142            (*inout_count_p)++;
5143        }
5144    }
5145
5146    /* we better not have lost any ops since the first loop through the job
5147     * list
5148     */
5149    assert((*inout_count_p) == done_count);
5150
5151    return(1);
5152}
5153
5154/* completion_query_context()
5155 *
5156 * retrieves completed jobs from specified context
5157 *
5158 * returns 1 if anything completed, 0 otherwise
5159 */
5160static int completion_query_context(job_id_t * out_id_array_p,
5161                                  int *inout_count_p,
5162                                  void **returned_user_ptr_array,
5163                                  job_status_s *
5164                                  out_status_array_p,
5165                                  job_context_id context_id)
5166{
5167    struct job_desc *query;
5168    int incount = *inout_count_p;
5169    *inout_count_p = 0;
5170
5171    if (completion_error)
5172    {
5173        return (completion_error);
5174    }
5175    while (*inout_count_p < incount && (query =
5176                                        job_desc_q_shownext(
5177                                        completion_queue_array[context_id])))
5178    {
5179        assert(query);
5180
5181        if (returned_user_ptr_array)
5182        {
5183            fill_status(query, &(returned_user_ptr_array[*inout_count_p]),
5184                        &(out_status_array_p[*inout_count_p]));
5185        }
5186        else
5187        {
5188            fill_status(query, NULL, &(out_status_array_p[*inout_count_p]));
5189        }
5190        out_id_array_p[*inout_count_p] = query->job_id;
5191        job_desc_q_remove(query);
5192        (*inout_count_p)++;
5193        /* special case for request scheduler */
5194        if (query->type == JOB_REQ_SCHED && query->u.req_sched.post_flag == 1)
5195        {
5196            /* special case; don't delete job desc for req sched
5197             * jobs; we need to hang on to them for use when entry
5198             * is released
5199             */
5200        }
5201        else
5202        {
5203            dealloc_job_desc(query);
5204            query = NULL;
5205        }
5206    }
5207
5208    if((*inout_count_p) > 0)
5209    {
5210        return(1);
5211    }
5212    else
5213    {
5214        return (0);
5215    }
5216}
5217
5218#ifndef __PVFS2_JOB_THREADED__
5219/* do_one_work_cycle_all()
5220 *
5221 * makes progress when threads are not used
5222 *
5223 * no return value
5224 */
5225static void do_one_work_cycle_all(int idle_time_ms)
5226{
5227    int total_pending_count = 0;
5228   
5229    gen_mutex_lock(&work_cycle_mutex);
5230
5231    total_pending_count = bmi_pending_count + bmi_unexp_pending_count
5232        + flow_pending_count + dev_unexp_pending_count + trove_pending_count;
5233
5234    if (bmi_pending_count || bmi_unexp_pending_count || flow_pending_count)
5235    {
5236        PINT_thread_mgr_bmi_push(idle_time_ms);
5237        idle_time_ms = 0;
5238    }
5239    if (dev_unexp_pending_count)
5240    {
5241        PINT_thread_mgr_dev_push(idle_time_ms);
5242    }
5243#ifdef __PVFS2_TROVE_SUPPORT__
5244    if(trove_pending_count || flow_pending_count)
5245        PINT_thread_mgr_trove_push(idle_time_ms);
5246#endif
5247
5248    if(total_pending_count == 0 && idle_time_ms != 0)
5249    {
5250        /* The caller would like for us to idle if necessary, but we really
5251         * don't have a single thing to do.  Sleep here to prevent busy
5252         * spins.
5253         */
5254        struct timespec ts;
5255         ts.tv_sec = idle_time_ms/1000;
5256         ts.tv_nsec = (idle_time_ms%1000)*1000*1000;
5257         nanosleep(&ts, NULL);
5258    }
5259
5260    gen_mutex_unlock(&work_cycle_mutex);
5261    return;
5262}
5263#endif
5264
5265/* flow_callback()
5266 *
5267 * function to be called upon completion of flows
5268 *
5269 * no return value
5270 */
5271static void flow_callback(flow_descriptor* flow_d, int cancel_path)
5272{
5273    struct job_desc* tmp_desc = (struct job_desc*)flow_d->user_ptr;
5274
5275    gen_mutex_lock(&initialized_mutex);
5276    if(initialized == 0)
5277    {
5278        /* The job interface has been shutdown.  Silently ignore callback. */
5279        gen_mutex_unlock(&initialized_mutex);
5280        return;
5281    }
5282    gen_mutex_unlock(&initialized_mutex);
5283
5284    /* set job descriptor fields and put into completion queue */
5285
5286    /* if this is being triggered directly from PINT_flow_cancel(), then the
5287     * completion mutex is already held by the caller; skip the mutex.
5288     */
5289    if(!cancel_path)
5290        gen_mutex_lock(&completion_mutex);
5291    job_desc_q_add(completion_queue_array[tmp_desc->context_id],
5292                   tmp_desc);
5293    /* set completed flag while holding queue lock */
5294    tmp_desc->completed_flag = 1;
5295
5296    flow_pending_count--;
5297    gossip_debug(GOSSIP_FLOW_DEBUG, "Job flows in progress (callback time): %d\n",
5298            flow_pending_count);
5299
5300#ifdef __PVFS2_JOB_THREADED__
5301    /* wake up anyone waiting for completion */
5302    pthread_cond_signal(&completion_cond);
5303#endif
5304    if(!cancel_path)
5305        gen_mutex_unlock(&completion_mutex);
5306
5307    return;
5308}
5309
5310#ifdef __PVFS2_TROVE_SUPPORT__
5311
5312/* job_precreate_pool_fill_signal_error()
5313 *
5314 * used for the entity responsible for filling the pool to indicate when
5315 * there are errors preventing it from making progress.  The error_code will
5316 * be propigated to get_handles() callers that are sleeping if the pool is
5317 * empty
5318 *
5319 * returns 0 on success, 1 on immediate completion, and -PVFS_errno on
5320 * failure
5321 */
5322int job_precreate_pool_fill_signal_error(
5323    PVFS_handle precreate_pool,
5324    PVFS_fs_id fsid,
5325    int error_code,
5326    void *user_ptr,
5327    job_aint status_user_tag,
5328    job_status_s * out_status_p,
5329    job_id_t * id,
5330    job_context_id context_id)
5331{
5332    struct job_desc* jd_checker;
5333    struct qlist_head* iterator;
5334    struct qlist_head* scratch;
5335
5336    gossip_debug(GOSSIP_FLOW_DEBUG, "job_precreate_pool_fill_signal_error() called.\n");
5337    /* note: this function always processes immediately (returns 1) */
5338
5339    gen_mutex_lock(&precreate_pool_mutex);
5340    /* see if anyone is waiting on pool handles */
5341    qlist_for_each_safe(iterator, scratch, &precreate_pool_get_handles_list)
5342    {
5343        jd_checker = qlist_entry(iterator, struct job_desc,
5344            job_desc_q_link);
5345
5346        qlist_del(&jd_checker->job_desc_q_link);
5347
5348        gossip_debug(GOSSIP_FLOW_DEBUG, "job_precreate_pool_fill_signal_error() waking up a get_handles() caller.\n");
5349        gen_mutex_lock(&completion_mutex);
5350
5351        /* set job descriptor fields and put into completion queue */
5352        jd_checker->u.precreate_pool.error_code = error_code;
5353        job_desc_q_add(completion_queue_array[jd_checker->context_id],
5354                       jd_checker);
5355        /* set completed flag while holding queue lock */
5356        jd_checker->completed_flag = 1;
5357
5358#ifdef __PVFS2_JOB_THREADED__
5359        /* wake up anyone waiting for completion */
5360        pthread_cond_signal(&completion_cond);
5361#endif
5362        gen_mutex_unlock(&completion_mutex);
5363    }
5364    gen_mutex_unlock(&precreate_pool_mutex);
5365
5366    out_status_p->error_code = 0;
5367    return(1);
5368}
5369
5370/* job_precreate_pool_fill()
5371 *
5372 * fills in handles for a precreate pool
5373 *
5374 * returns 0 on success, 1 on immediate completion, and -PVFS_errno on
5375 * failure
5376 */
5377int job_precreate_pool_fill(
5378    PVFS_handle precreate_pool,
5379    PVFS_fs_id fsid,
5380    PVFS_handle* precreate_handle_array,
5381    int precreate_handle_count,
5382    void *user_ptr,
5383    job_aint status_user_tag,
5384    job_status_s * out_status_p,
5385    job_id_t * id,
5386    job_context_id context_id,
5387    PVFS_hint hints)
5388{
5389    struct job_desc *jd = NULL;
5390
5391    gossip_debug(GOSSIP_JOB_DEBUG, "job_precreate_pool_fill() called.\n");
5392
5393    /* create the job desc first, even though we may not use it.  This
5394     * gives us somewhere to store information
5395     */
5396    jd = alloc_job_desc(JOB_PRECREATE_POOL);
5397    if (!jd)
5398    {
5399        return (-errno);
5400    }
5401    jd->job_user_ptr = user_ptr;
5402    jd->context_id = context_id;
5403    jd->status_user_tag = status_user_tag;
5404    jd->hints = hints;
5405    jd->trove_callback.fn = precreate_pool_fill_thread_mgr_callback;
5406    jd->trove_callback.data = (void*)jd;
5407    jd->u.precreate_pool.precreate_pool = precreate_pool;
5408    jd->u.precreate_pool.precreate_handle_array = precreate_handle_array;
5409    jd->u.precreate_pool.precreate_handle_count = precreate_handle_count;
5410    jd->u.precreate_pool.precreate_handle_index = 0;
5411    jd->u.precreate_pool.first_callback_flag = 1;
5412    jd->u.precreate_pool.fsid = fsid;
5413    jd->u.precreate_pool.key_array =
5414        malloc(PRECREATE_POOL_MAX_KEYS*sizeof(TROVE_keyval_s));
5415    if(!jd->u.precreate_pool.key_array)
5416    {
5417        dealloc_job_desc(jd);
5418        out_status_p->error_code = -PVFS_ENOMEM;
5419        return(1);
5420    }
5421
5422    /* reuse the logic for trove op completion to get this started */
5423    precreate_pool_fill_thread_mgr_callback(jd, 0);
5424
5425    /* for the moment, this type of job cannot immediately complete */
5426
5427    *id = jd->job_id;
5428    return (0);
5429}
5430 
5431/* job_precreate_pool_lookup_server()
5432 *
5433 * resolves a string hostname into a pool handle
5434 */
5435int job_precreate_pool_lookup_server(
5436    const char* host,
5437    PVFS_fs_id fsid,
5438    PVFS_handle* pool_handle)
5439{
5440    struct precreate_pool* pool;
5441    struct qlist_head* iterator;
5442    struct fs_pool* fs;
5443
5444    gen_mutex_lock(&precreate_pool_mutex);
5445
5446    fs = find_fs(fsid);
5447    assert(fs);
5448
5449    /* check pool list, go back to sleep if any are empty */
5450    qlist_for_each(iterator, &fs->precreate_pool_list)
5451    {
5452        pool = qlist_entry(iterator, struct precreate_pool,
5453            list_link);
5454        if(!strcmp(pool->host, host))
5455        {
5456            *pool_handle = pool->pool_handle;
5457            gen_mutex_unlock(&precreate_pool_mutex);
5458            return(0);
5459        }
5460    }
5461    gen_mutex_unlock(&precreate_pool_mutex);
5462
5463    return(-PVFS_ENOENT);
5464}
5465   
5466/* job_precreate_pool_set_index()
5467 *
5468 * used to assign a unique offset into the list of servers on each daemon in
5469 * the file system, so that load is balanced evenly
5470 */
5471void job_precreate_pool_set_index(
5472    int server_index)
5473{
5474    struct qlist_head* iterator;
5475    struct qlist_head* iterator2;
5476    int num_pools = 0;
5477    int pool_index = 0;
5478    int current_index = 0;
5479    struct fs_pool* fs;
5480
5481    gen_mutex_lock(&precreate_pool_mutex);
5482
5483    qlist_for_each(iterator2, &precreate_pool_fs_list)
5484    {
5485        num_pools = 0;
5486        pool_index = 0;
5487        current_index = 0;
5488
5489        fs = qlist_entry(iterator2, struct fs_pool, list_link);
5490
5491        qlist_for_each(iterator, &fs->precreate_pool_list)
5492        {
5493            num_pools++;
5494        }
5495       
5496        if(num_pools == 0)
5497        {
5498            pool_index = 0;
5499        }
5500        else
5501        {
5502            pool_index = server_index % num_pools;
5503        }
5504
5505        qlist_for_each(iterator, &fs->precreate_pool_list)
5506        {
5507            if(current_index == pool_index)
5508            {
5509                fs->precreate_pool_initial = iterator;
5510                break;
5511            }
5512            current_index++;
5513        }
5514
5515        /* safety check, should not hit this case */
5516        if(!fs->precreate_pool_initial)
5517        {
5518            fs->precreate_pool_initial = fs->precreate_pool_list.next;
5519        }
5520    }
5521
5522    gen_mutex_unlock(&precreate_pool_mutex);
5523
5524    return;
5525}
5526 
5527int job_precreate_pool_register_server(
5528    const char* host,
5529    PVFS_fs_id fsid,
5530    PVFS_handle pool_handle,
5531    int count)
5532{
5533    struct precreate_pool* tmp_pool;
5534    struct fs_pool* fs;
5535
5536    /* create a little struct to track the pool information for this peer
5537     * server
5538     */
5539    tmp_pool = malloc(sizeof(*tmp_pool));
5540    if(!tmp_pool)
5541    {
5542        return(-ENOMEM);
5543    }
5544
5545    tmp_pool->host = strdup(host);
5546    if(!tmp_pool->host)
5547    {
5548        free(tmp_pool);
5549        return(-ENOMEM);
5550    }
5551
5552    tmp_pool->pool_handle = pool_handle;
5553    tmp_pool->pool_count = count;
5554    gossip_debug(GOSSIP_JOB_DEBUG,
5555        "Pool count for handle %llu initially set to %d\n",
5556        llu(tmp_pool->pool_handle),
5557        tmp_pool->pool_count);
5558
5559    gossip_debug(GOSSIP_JOB_DEBUG,
5560        "Initial pool count for host %s, fsid %d: %d\n", host, (int)fsid,
5561        count);
5562
5563    /* search through file systems to see if we have registered anything for
5564     * this fsid yet
5565     */
5566    fs = find_fs(fsid);
5567    if(!fs)
5568    {
5569        /* allocate a new structure for this fsid */
5570        fs = malloc(sizeof(*fs));
5571        if(!fs)
5572        {
5573            free(tmp_pool->host);
5574            free(tmp_pool);
5575            return(-ENOMEM);
5576        }
5577        memset(fs, 0, sizeof(*fs));
5578        fs->fsid = fsid;
5579        fs->precreate_pool_initial = NULL;
5580        INIT_QLIST_HEAD(&fs->precreate_pool_list);
5581        qlist_add(&fs->list_link, &precreate_pool_fs_list);
5582    }
5583
5584    /* stash the info where we can search and find it later */
5585    qlist_add(&tmp_pool->list_link, &fs->precreate_pool_list);
5586
5587    return(1);
5588}
5589   
5590/* job_precreate_pool_check_level()
5591 *
5592 * checks to see if the current pool level is below a specified threshold
5593 *
5594 * returns 1 on immediate completion, 0 if level is not low enough yet
5595 */
5596int job_precreate_pool_check_level(
5597    PVFS_handle precreate_pool,
5598    PVFS_fs_id fsid,
5599    int low_threshold,
5600    void *user_ptr,
5601    job_aint status_user_tag,
5602    job_status_s * out_status_p,
5603    job_id_t * id,
5604    job_context_id context_id)
5605{
5606    struct qlist_head* iterator;
5607    struct precreate_pool* pool;
5608    struct job_desc *jd = NULL;
5609    struct fs_pool* fs;
5610
5611    gen_mutex_lock(&precreate_pool_mutex);
5612
5613    fs = find_fs(fsid);
5614    assert(fs);
5615
5616    qlist_for_each(iterator, &fs->precreate_pool_list)
5617    {
5618        pool = qlist_entry(iterator, struct precreate_pool,
5619            list_link);
5620        if(pool->pool_handle == precreate_pool)
5621        {
5622            if(pool->pool_count < low_threshold)
5623            {
5624                /* handle count is below the low threshold */
5625                out_status_p->error_code = 0;
5626                gen_mutex_unlock(&precreate_pool_mutex);
5627                gossip_debug(GOSSIP_JOB_DEBUG, "found pool count low.\n");
5628                return(1);
5629            }
5630            else
5631            {
5632                /* we are above threshold right now; queue up until it drops */
5633                jd = alloc_job_desc(JOB_PRECREATE_POOL);
5634                if (!jd)
5635                {
5636                    out_status_p->error_code = -PVFS_ENOMEM;
5637                    gen_mutex_unlock(&precreate_pool_mutex);
5638                    return(1);
5639                }
5640                jd->job_user_ptr = user_ptr;
5641                jd->context_id = context_id;
5642                jd->status_user_tag = status_user_tag;
5643                jd->u.precreate_pool.precreate_pool = precreate_pool;
5644                jd->u.precreate_pool.fsid = fsid;
5645                jd->u.precreate_pool.low_threshold = low_threshold;
5646                *id = jd->job_id;
5647
5648                qlist_add(&jd->job_desc_q_link, &precreate_pool_check_level_list);
5649                gen_mutex_unlock(&precreate_pool_mutex);
5650                gossip_debug(GOSSIP_JOB_DEBUG, "found pool count high.\n");
5651                return(0);
5652            }
5653            break;
5654        }
5655    }
5656    gen_mutex_unlock(&precreate_pool_mutex);
5657
5658    return(-PVFS_EINVAL);
5659}
5660 
5661/* job_precreate_pool_get_handles()
5662 *
5663 * Retrieves a set of datafile handles from one or more precreate pools.
5664 * Servers may be specified using bmi addresses in the servers array.  If
5665 * servers is NULL, then it will provide handles from pools in round robin
5666 * manner.
5667 *
5668 * returns 0 on success, 1 on immediate completion, and -PVFS_errno on failure
5669 */
5670int job_precreate_pool_get_handles(
5671    PVFS_fs_id fsid,
5672    int count,
5673    const char** servers,
5674    PVFS_handle* handle_array,
5675    PVFS_ds_flags flags,
5676    void *user_ptr,
5677    job_aint status_user_tag,
5678    job_status_s * out_status_p,
5679    job_id_t * id,
5680    job_context_id context_id,
5681    PVFS_hint hints)
5682{
5683    struct job_desc *jd = NULL;
5684    struct fs_pool* fs;
5685
5686    if(count < 0)
5687    {
5688        out_status_p->error_code = -PVFS_EINVAL;
5689        return(1);
5690    }
5691
5692    jd = alloc_job_desc(JOB_PRECREATE_POOL);
5693    if (!jd)
5694    {
5695        out_status_p->error_code = -PVFS_ENOMEM;
5696        return(1);
5697    }
5698    jd->job_user_ptr = user_ptr;
5699    jd->context_id = context_id;
5700    jd->hints = hints;
5701    jd->status_user_tag = status_user_tag;
5702    jd->u.precreate_pool.precreate_handle_array = handle_array;
5703    jd->u.precreate_pool.precreate_handle_count = count;
5704    jd->u.precreate_pool.precreate_handle_index = 0;
5705    jd->u.precreate_pool.fsid = fsid;
5706    jd->u.precreate_pool.servers = servers;
5707    jd->u.precreate_pool.trove_pending = 0;
5708    jd->u.precreate_pool.flags = flags;
5709
5710    /* rotate to use a different starting server in the pool next time */
5711    gen_mutex_lock(&precreate_pool_mutex);
5712    fs = find_fs(fsid);
5713    assert(fs);
5714    jd->u.precreate_pool.current_pool = fs->precreate_pool_initial;
5715    fs->precreate_pool_initial = fs->precreate_pool_initial->next;
5716    gen_mutex_unlock(&precreate_pool_mutex);
5717   
5718    precreate_pool_get_handles_try_post(jd);
5719
5720    /* for the moment, this type of job cannot immediately complete */
5721    *id = jd->job_id;
5722    return(0);
5723}
5724
5725/* precreate_pool_get_handles_try_post()
5726 *
5727 * Internal function used by job_precreate_pool_get_handles().  This
5728 * function will check to see if all pools are ready (at least one handle
5729 * available) and then post all required trove operations
5730 *
5731 * no return value
5732 */
5733static void precreate_pool_get_handles_try_post(struct job_desc* jd)
5734{
5735    struct precreate_pool* pool;
5736    TROVE_op_id tmp_id;
5737    int ret;
5738    struct precreate_pool_get_trove* tmp_trove_array;
5739    struct qlist_head* iterator;
5740    struct qlist_head* scratch;
5741    struct job_desc* jd_checker;
5742    int i;
5743    struct fs_pool* fs;
5744
5745    gossip_debug(GOSSIP_JOB_DEBUG, "precreate_pool_get_handles_try_post\n");
5746
5747    gen_mutex_lock(&precreate_pool_mutex);
5748
5749    fs = find_fs(jd->u.precreate_pool.fsid);
5750    assert(fs);
5751
5752    /* check pool list, go back to sleep if any are empty */
5753    qlist_for_each(iterator, &fs->precreate_pool_list)
5754    {
5755        pool = qlist_entry(iterator, struct precreate_pool,
5756            list_link);
5757        if(pool->pool_count < 1)
5758        {
5759            /* queue up until the count for this pool increases */
5760            qlist_add(&jd->job_desc_q_link, &precreate_pool_get_handles_list);
5761            gossip_debug(GOSSIP_JOB_DEBUG, "Found empty precreate pool %llu\n", llu(pool->pool_handle));
5762           
5763            gen_mutex_unlock(&precreate_pool_mutex);
5764            return;
5765        }
5766    }
5767
5768    /* if we get to this point, set up necessary information for all trove
5769     * operations needed to service job
5770     */
5771    tmp_trove_array = malloc(jd->u.precreate_pool.precreate_handle_count *
5772        sizeof(struct precreate_pool_get_trove));
5773    if(!tmp_trove_array)
5774    {
5775        gen_mutex_unlock(&precreate_pool_mutex);
5776        gen_mutex_lock(&completion_mutex);       
5777        jd->u.precreate_pool.error_code = -PVFS_ENOMEM;
5778        job_desc_q_add(completion_queue_array[jd->context_id], jd);
5779        jd->completed_flag = 1;
5780#ifdef __PVFS2_JOB_THREADED__
5781        /* wake up anyone waiting for completion */
5782        pthread_cond_signal(&completion_cond);
5783#endif
5784        gen_mutex_unlock(&completion_mutex);       
5785        return;
5786
5787    }
5788    jd->u.precreate_pool.data = tmp_trove_array;
5789
5790    /* translate reqested servers and set up necessary fields to post
5791     * trove operations
5792     */
5793    for(i=0; i<jd->u.precreate_pool.precreate_handle_count; i++)
5794    {
5795        if(jd->u.precreate_pool.servers)
5796        {
5797            /* caller wanted specific servers ; search through list and
5798             * set current pool to appropriate entry for this server
5799             */
5800            jd->u.precreate_pool.current_pool = NULL; /* sentinal */
5801            qlist_for_each(iterator, &fs->precreate_pool_list)
5802            {
5803                pool = qlist_entry(iterator, struct precreate_pool,
5804                    list_link);
5805                if(!strcmp(pool->host, jd->u.precreate_pool.servers[i]))
5806                {
5807                    jd->u.precreate_pool.current_pool = iterator;
5808                    break;
5809                }
5810            }
5811            if(!jd->u.precreate_pool.current_pool)
5812            {
5813                gossip_err("Error: get_handles(): unknown server: %s\n",
5814                    jd->u.precreate_pool.servers[i]);
5815
5816                free(tmp_trove_array);
5817                gen_mutex_unlock(&precreate_pool_mutex);
5818
5819                gen_mutex_lock(&completion_mutex);       
5820                jd->u.precreate_pool.error_code = -PVFS_EINVAL;
5821                job_desc_q_add(completion_queue_array[jd->context_id], jd);
5822                jd->completed_flag = 1;
5823        #ifdef __PVFS2_JOB_THREADED__
5824                /* wake up anyone waiting for completion */
5825                pthread_cond_signal(&completion_cond);
5826        #endif
5827                gen_mutex_unlock(&completion_mutex);       
5828                return;
5829            }
5830        }
5831        else
5832        {
5833            /* caller wants whatever we hand out */
5834            if(jd->u.precreate_pool.current_pool == NULL ||
5835                jd->u.precreate_pool.current_pool->next == &fs->precreate_pool_list)
5836            {
5837                /* either we are just starting, or we have wrapped around */
5838                jd->u.precreate_pool.current_pool = fs->precreate_pool_list.next;
5839            }
5840            else
5841            {
5842                /* normal case; cycle to next pool */
5843                jd->u.precreate_pool.current_pool =
5844                    jd->u.precreate_pool.current_pool->next;
5845            }
5846        }
5847
5848        tmp_trove_array[i].pool = qlist_entry(jd->u.precreate_pool.current_pool,
5849            struct precreate_pool, list_link);
5850
5851        tmp_trove_array[i].jd = jd;
5852        tmp_trove_array[i].pos = PVFS_ITERATE_START;
5853        tmp_trove_array[i].count = 1;
5854        tmp_trove_array[i].key.buffer
5855            = &jd->u.precreate_pool.precreate_handle_array[i];
5856        tmp_trove_array[i].key.buffer_sz = sizeof(PVFS_handle);
5857        tmp_trove_array[i].trove_callback.fn
5858            = precreate_pool_get_thread_mgr_callback;
5859        tmp_trove_array[i].trove_callback.data
5860            = &tmp_trove_array[i];
5861    }
5862
5863    /* post all trove operations at once */
5864    for(i=0; i<jd->u.precreate_pool.precreate_handle_count; i++)
5865    {
5866        /* go ahead and decrement count to avoid races with other consumers */
5867        tmp_trove_array[i].pool->pool_count--;
5868        gossip_debug(GOSSIP_JOB_DEBUG,
5869            "Pool count for handle %llu decremented to %d\n",
5870            llu(tmp_trove_array[i].pool->pool_handle),
5871            tmp_trove_array[i].pool->pool_count);
5872
5873        /* is anyone waiting to check the count of this pool? */
5874        if(!qlist_empty(&precreate_pool_check_level_list))
5875        {
5876            qlist_for_each_safe(iterator, scratch,
5877                &precreate_pool_check_level_list)
5878            {
5879                jd_checker = qlist_entry(iterator, struct job_desc,
5880                    job_desc_q_link);
5881                if(jd_checker->u.precreate_pool.precreate_pool ==
5882                    tmp_trove_array[i].pool->pool_handle &&
5883                    tmp_trove_array[i].pool->pool_count <
5884                    jd_checker->u.precreate_pool.low_threshold)
5885                {
5886                    /* the pool level is low */
5887                    gossip_debug(GOSSIP_JOB_DEBUG, "Pool count low, waking up waiter for handle %llu.\n", llu(jd_checker->u.precreate_pool.precreate_pool));
5888                    qlist_del(&jd_checker->job_desc_q_link);
5889
5890                    /* move waiting job to completion queue */
5891                    gen_mutex_lock(&completion_mutex);       
5892                    job_desc_q_add(completion_queue_array[jd->context_id], jd_checker);
5893                    jd->completed_flag = 1;
5894#ifdef __PVFS2_JOB_THREADED__
5895                    /* wake up anyone waiting for completion */
5896                    pthread_cond_signal(&completion_cond);
5897#endif
5898                    gen_mutex_unlock(&completion_mutex);       
5899                }
5900            }
5901        }
5902
5903        /* pre-increment pending count before posting trove operation */
5904        trove_pending_count++;
5905        jd->u.precreate_pool.trove_pending++;
5906
5907        /* post trove operation to pull out a handle */
5908        ret = trove_keyval_iterate_keys(
5909            fs->fsid,
5910            tmp_trove_array[i].pool->pool_handle,
5911            &tmp_trove_array[i].pos,
5912            &tmp_trove_array[i].key,
5913            &tmp_trove_array[i].count,
5914            tmp_trove_array[i].jd->u.precreate_pool.flags|
5915            TROVE_BINARY_KEY|
5916            TROVE_KEYVAL_HANDLE_COUNT|
5917            TROVE_KEYVAL_ITERATE_REMOVE,
5918            NULL,
5919            &tmp_trove_array[i].trove_callback,
5920            global_trove_context,
5921            &tmp_id,
5922            jd->hints);
5923        if(ret < 0)
5924        {
5925            precreate_pool_get_thread_mgr_callback_unlocked(
5926                &tmp_trove_array[i], ret);
5927        }
5928        else if(ret == 1)
5929        {
5930            precreate_pool_get_thread_mgr_callback_unlocked(
5931                &tmp_trove_array[i], 0);
5932        }
5933        else
5934        {
5935            /* callback will be triggered later */
5936        }
5937    }
5938    gen_mutex_unlock(&precreate_pool_mutex);
5939}
5940
5941/* job_precreate_pool_iterate_handles()
5942 *
5943 * similar to the trove iterate handles function, but returns all handles
5944 * stored in the precreate pools, including the handles for the pool objects
5945 * themselves.
5946 */
5947int job_precreate_pool_iterate_handles(
5948    PVFS_fs_id fsid,
5949    PVFS_ds_position position,
5950    PVFS_handle* handle_array,
5951    int count,
5952    PVFS_ds_flags flags,
5953    PVFS_vtag* vtag,
5954    void* user_ptr,
5955    job_aint status_user_tag,
5956    job_status_s* out_status_p,
5957    job_id_t* id,
5958    job_context_id context_id,
5959    PVFS_hint hints)
5960{
5961    PVFS_ds_position local_position;
5962    PVFS_ds_position pool_index;
5963    struct qlist_head* iterator;
5964    PVFS_ds_position tmp_index = 1;
5965    struct precreate_pool* pool = NULL;
5966    int ret;
5967    struct job_desc *jd = NULL;
5968    void* user_ptr_internal;
5969    TROVE_op_id tmp_id;
5970    int i;
5971    struct fs_pool* fs;
5972
5973    /* low order bits are the trove iterate position */
5974    local_position = position & 0xffffffff;
5975    /* high order bits tell us which pool we are on */
5976    pool_index = position >> 32;
5977
5978    /* we start indexing at one and reserve 0 for the special start and end
5979     * values for the entire set of pools
5980     */
5981    if(pool_index == 0)
5982    {
5983        if(local_position == PVFS_ITERATE_START)
5984        {
5985            pool_index = 1;
5986        }
5987        else
5988        {
5989            gossip_err("Error: invalid position given to job_precreate_pool_iterate_handles().\n");
5990            out_status_p->error_code = -PVFS_EINVAL;
5991            return(1);
5992        }
5993    }
5994
5995    gen_mutex_lock(&precreate_pool_mutex);
5996
5997    fs = find_fs(fsid);
5998    if(!fs)
5999    {
6000        /* no precreate pools available for the requested fs; stop iteration
6001         * right here
6002         */
6003        gen_mutex_unlock(&precreate_pool_mutex);
6004        out_status_p->error_code = 0;
6005        out_status_p->count = 0;
6006        out_status_p->position = PVFS_ITERATE_END;
6007        return(1);
6008    }
6009
6010    qlist_for_each(iterator, &fs->precreate_pool_list)
6011    {
6012        if(tmp_index == pool_index)
6013        {
6014            pool = qlist_entry(iterator, struct precreate_pool,
6015                list_link);
6016            break;
6017        }
6018        tmp_index++;
6019    }
6020
6021    if(!pool)
6022    {
6023        /* we ran out of pools; iteration is done */
6024        gen_mutex_unlock(&precreate_pool_mutex);
6025        out_status_p->error_code = 0;
6026        out_status_p->count = 0;
6027        out_status_p->position = PVFS_ITERATE_END;
6028        return(1);
6029    }
6030
6031    if(local_position == PVFS_ITERATE_END)
6032    {
6033        /* we got all of the handles out of the pool */
6034        /* pass back pool handle by itself and go to next pool */
6035        handle_array[0] = pool->pool_handle;
6036        /* skip to next pool */
6037        pool_index++;
6038        out_status_p->position = pool_index << 32;
6039        out_status_p->position |= PVFS_ITERATE_START;
6040        out_status_p->count = 1;
6041        out_status_p->error_code = 0;
6042        gen_mutex_unlock(&precreate_pool_mutex);
6043        return(1);
6044    }
6045
6046    /* get ready to post a job to trove to find handles */
6047    jd = alloc_job_desc(JOB_PRECREATE_POOL);
6048    if (!jd)
6049    {
6050        gen_mutex_unlock(&precreate_pool_mutex);
6051        out_status_p->error_code = -PVFS_ENOMEM;
6052        return 1;
6053    }
6054    jd->u.precreate_pool.key_array = malloc(count * sizeof(*jd->u.precreate_pool.key_array));
6055    if(!jd->u.precreate_pool.key_array)
6056    {
6057        gen_mutex_unlock(&precreate_pool_mutex);
6058        dealloc_job_desc(jd);
6059        out_status_p->error_code = -PVFS_ENOMEM;
6060        return 1;
6061    }
6062    for(i=0; i<count; i++)
6063    {
6064        jd->u.precreate_pool.key_array[i].buffer = &handle_array[i];
6065        jd->u.precreate_pool.key_array[i].buffer_sz = sizeof(handle_array[i]);
6066    }
6067    jd->job_user_ptr = user_ptr;
6068    jd->hints = hints;
6069    jd->u.precreate_pool.position = local_position;
6070    jd->u.precreate_pool.count = count;
6071    jd->u.precreate_pool.precreate_handle_array = handle_array;
6072    jd->u.precreate_pool.pool_index = pool_index;
6073    jd->context_id = context_id;
6074    jd->status_user_tag = status_user_tag;
6075    jd->trove_callback.fn = precreate_pool_iterate_callback;
6076    jd->trove_callback.data = (void*)jd;
6077    user_ptr_internal = &jd->trove_callback;
6078
6079#ifdef __PVFS2_TROVE_SUPPORT__
6080    ret = trove_keyval_iterate_keys(fsid, pool->pool_handle,
6081                               &(jd->u.precreate_pool.position),
6082                               jd->u.precreate_pool.key_array,
6083                               &(jd->u.precreate_pool.count), flags, NULL,
6084                               user_ptr_internal,
6085                               global_trove_context, &tmp_id, jd->hints);
6086#else
6087    gossip_err("Error: Trove support not enabled.\n");
6088    ret = -ENOSYS;
6089#endif
6090
6091    if (ret < 0)
6092    {
6093        /* error posting trove operation */
6094        free(jd->u.precreate_pool.key_array);
6095        dealloc_job_desc(jd);
6096        jd = NULL;
6097        out_status_p->error_code = ret;
6098        out_status_p->status_user_tag = status_user_tag;
6099        gen_mutex_unlock(&precreate_pool_mutex);
6100        return (1);
6101    }
6102
6103    if (ret == 1)
6104    {
6105        /* immediate completion */
6106        out_status_p->error_code = 0;
6107        out_status_p->status_user_tag = status_user_tag;
6108        out_status_p->position = pool_index << 32;
6109        out_status_p->position |= jd->u.precreate_pool.position;
6110        out_status_p->count = jd->u.precreate_pool.count;
6111        free(jd->u.precreate_pool.key_array);
6112        dealloc_job_desc(jd);
6113        jd = NULL;
6114        gen_mutex_unlock(&precreate_pool_mutex);
6115        return (ret);
6116    }
6117
6118    /* if we fall through to this point, the job did not
6119     * immediately complete and we must queue up to test later
6120     */
6121    *id = jd->job_id;
6122    trove_pending_count++;
6123    gen_mutex_unlock(&precreate_pool_mutex);
6124
6125    return (0);
6126}
6127
6128static struct fs_pool* find_fs(PVFS_fs_id fsid)
6129{
6130    struct fs_pool* fs;
6131    struct qlist_head* iterator;
6132
6133    qlist_for_each(iterator, &precreate_pool_fs_list)
6134    {
6135        fs = qlist_entry(iterator, struct fs_pool, list_link);
6136        if(fs->fsid == fsid)
6137        {
6138            return(fs);
6139        }
6140    }
6141    return(NULL);
6142}
6143
6144
6145#endif /* __PVFS2_TROVE_SUPPORT__ */
6146
6147/*
6148 * Local variables:
6149 *  c-indent-level: 4
6150 *  c-basic-offset: 4
6151 * End:
6152 *
6153 * vim: ts=8 sts=4 sw=4 expandtab
6154 */
Note: See TracBrowser for help on using the browser.