root/branches/as-branch/src/server/pipeline.sm @ 7825

Revision 7825, 24.7 KB (checked in by pcarns, 4 years ago)

fix undefined reference warnings for log2(), PINT_util_gen_credentials(),
and PINT_cached_config_map_to_server()

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/*
8 *  PVFS2 server state machine for driving read/write I/O operations.
9 */
10#define _ISOC99_SOURCE /* log2() */
11
12#include <string.h>
13#include <assert.h>
14#include <stdlib.h>
15#include <math.h> /* log2() */
16
17#include "server-config.h"
18#include "pvfs2-server.h"
19#include "pvfs2-attr.h"
20#include "pvfs2-request.h"
21#include "pint-distribution.h"
22#include "pvfs2-dist-simple-stripe.h"
23#include "pint-request.h"
24#include "pvfs2-internal.h"
25#include "trove.h"
26#include "pint-util.h"
27#include "pint-cached-config.h"
28
29#define LOOP 101
30#define UNALIGNED 102
31#define DO_COMP 103
32#define DO_ALLREDUCE 104
33#define CONTINUE_ALLREDUCE 105
34
35static int s2s_comp_fn(
36    void *v_p, struct PVFS_server_resp *resp_p, int index);
37
38%%
39
40nested machine pvfs2_pipeline_sm
41{
42    state fetch
43    {
44        run fetch_data;
45        success => dispatch;
46    }
47
48    state dispatch
49    {
50        run dispatch_data;
51        DO_COMP => check_align;
52        success => check_pipeline;
53    }
54
55    state check_align
56    {
57        run check_align_fn;
58        UNALIGNED => setup_s2s;
59        success => do_comp;
60    }
61
62    state setup_s2s
63    {
64        run setup_s2s_msg;
65        success => s2s_exchange;
66    }
67
68    state s2s_exchange
69    {
70        jump pvfs2_msgpairarray_sm;
71        success => do_comp;
72    }
73
74    state do_comp
75    {
76        run do_comp_fn;
77        DO_ALLREDUCE => setup_allreduce;
78        success => check_pipeline;
79    }
80
81    state setup_allreduce
82    {
83        pjmp setup_allreduce_sm
84        {
85            success => pvfs2_allreduce_sm;
86        }
87        success => cleanup_allreduce;
88    }
89
90    state cleanup_allreduce
91    {
92        run cleanup_allreduce_fn;
93        success => check_pipeline;
94    }
95
96    state check_pipeline
97    {
98        run check_pipeline_done;
99        LOOP => fetch;
100        success => return;
101    }
102}
103
104%%
105
106/*
107 * fetch data from either TROVE (in case of READ) or BMI (in case of WRITE)
108 *
109 *   PINT_segpool_take_segments()
110 *     => READ: job_trove_bstream_read_list()
111 *     => WRITE: job_bmi_recv()
112 */
113static PINT_sm_action fetch_data(struct PINT_smcb *smcb, job_status_s *js_p)
114{
115    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
116    PINT_segpool_handle_t seg_handle = s_op->u.pipeline.seg_handle;
117    PINT_segpool_unit_id id = s_op->u.pipeline.id;
118    struct server_configuration_s *user_opts = get_server_config_struct();
119    int count, ret, i;
120    PVFS_offset *offsets;
121    PVFS_size *sizes;
122    PVFS_size bytes;
123    job_id_t tmp_id;
124
125    gossip_debug(GOSSIP_IO_DEBUG, "smcb->base_frame=%d, frame_count=%d\n", smcb->base_frame, smcb->frame_count);
126    gossip_debug(GOSSIP_IO_DEBUG, "smcb=%p, smcb->parent_smcb=%p\n", smcb, smcb->parent_smcb);
127    s_op->u.pipeline.buffer_used = 0;
128    bytes = s_op->u.pipeline.buffer_size;
129
130    PINT_segpool_take_segments(seg_handle, id, &bytes, &count,
131                               &offsets, &sizes);
132    gossip_debug(GOSSIP_IO_DEBUG, "%s: %s: bytes=%lld, count=%d\n",
133                 __func__,
134                 (s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
135                 lld(bytes), count);
136
137    for(i=0; i<count; i++) {
138        gossip_debug(GOSSIP_IO_DEBUG, "offsets[%d]=%lld, sizes[%d]=%lld\n",
139                     i, lld(offsets[i]), i, lld(sizes[i]));
140    }
141
142    if(count == 0) {
143        js_p->error_code = 0;
144        //gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
145        return SM_ACTION_COMPLETE;
146    }
147
148    s_op->u.pipeline.buffer_used = bytes;
149    s_op->u.pipeline.offsets = offsets;
150    s_op->u.pipeline.sizes = sizes;
151    s_op->u.pipeline.segs = count;
152
153    if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
154       
155        ret = job_trove_bstream_read_list
156            (s_op->u.pipeline.fs_id,
157             s_op->u.pipeline.handle,
158             (char **)&s_op->u.pipeline.buffer,
159             (PVFS_size *)&s_op->u.pipeline.buffer_used,
160             1,
161             offsets,
162             sizes,
163             count,
164             &s_op->u.pipeline.out_size,
165             s_op->u.pipeline.trove_sync_flag,
166             NULL,
167             smcb,
168             0,
169             js_p,
170             &tmp_id,
171             server_job_context,
172             s_op->u.pipeline.hints);
173    }
174    else if (s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
175        ret = job_bmi_recv(s_op->u.pipeline.address,
176                       (void *)s_op->u.pipeline.buffer,
177                       s_op->u.pipeline.buffer_size,
178                       s_op->u.pipeline.tag,
179                       BMI_PRE_ALLOC,
180                       smcb,
181                       0, /* unsigned long status_user_tag = 0 */
182                       js_p,
183                       &tmp_id,
184                       server_job_context,
185                       user_opts->server_job_flow_timeout,
186                       (bmi_hint)s_op->u.pipeline.hints);
187    }
188
189    if(ret < 0) {
190        gossip_err("%s: I/O error occurred\n", __func__);
191        /* FIXME */
192        //handle_io_error(ret, q_item, flow_data);
193        js_p->error_code = -PVFS_EIO;
194        return SM_ACTION_COMPLETE;
195    }
196
197    /* immediate return */
198    if(ret == 1) {
199        js_p->error_code = 1;
200        return SM_ACTION_COMPLETE;
201    }
202
203    if(ret == 0) {
204        js_p->error_code = 0;
205        return SM_ACTION_DEFERRED;
206    }
207
208    return SM_ACTION_COMPLETE;
209}
210
211/*
212 * Dispatch data to either BMI (in case of READ) or TROVE (in case of WRITE)
213 *
214 *   => READ: job_bmi_send()
215 *   => WRITE: job_trove_bstream_write_list()
216 */
217static PINT_sm_action dispatch_data(struct PINT_smcb *smcb, job_status_s *js_p)
218{
219    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
220    int ret;
221    job_id_t tmp_id;
222    struct server_configuration_s *user_opts = get_server_config_struct();
223    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
224
225    if(s_op->u.pipeline.segs == 0) {
226        js_p->error_code = 0;
227        gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
228        return SM_ACTION_COMPLETE;
229    }
230
231    gossip_debug(GOSSIP_IO_DEBUG, "%s: %s: buffer_used=%lld\n", __func__,
232                 (s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
233                 lld(s_op->u.pipeline.buffer_used));
234#if 0
235    gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
236                 (char *)s_op->u.pipeline.buffer);
237#endif
238
239   
240    if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
241        assert(s_op->u.pipeline.buffer_used);
242
243        if(parent_s_op->u.io.op != 0) { /* AS: when op is specified */
244            ret = DO_COMP; /* AS: skip sending if op is specified */
245            gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
246            js_p->error_code = ret;
247            return SM_ACTION_COMPLETE;
248        }
249        else
250            ret = job_bmi_send(s_op->u.pipeline.address,
251                               s_op->u.pipeline.buffer,
252                               js_p->actual_size,
253                               s_op->u.pipeline.tag,
254                               BMI_PRE_ALLOC,
255                               0, /* send_unexpected */
256                               smcb, /* user_ptr */
257                               0, /* status_user_tag */
258                               js_p,
259                               &tmp_id,
260                               server_job_context,
261                               user_opts->server_job_bmi_timeout,
262                               (bmi_hint)s_op->u.pipeline.hints);
263       
264    }
265    else if(s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
266        ret = job_trove_bstream_write_list
267            (s_op->u.pipeline.fs_id,
268             s_op->u.pipeline.handle,
269             (char **)&s_op->u.pipeline.buffer,
270             (TROVE_size *)&js_p->actual_size,
271             1,
272             s_op->u.pipeline.offsets,
273             s_op->u.pipeline.sizes,
274             s_op->u.pipeline.segs,
275             &s_op->u.pipeline.out_size,
276             s_op->u.pipeline.trove_sync_flag,
277             NULL,
278             smcb,
279             0,
280             js_p,
281             &tmp_id,
282             server_job_context,
283             s_op->u.pipeline.hints);
284    }
285
286    if(ret < 0) {
287        gossip_err("%s: I/O error occurred\n", __func__);
288        /* FIXME !!!!!!! */
289        /* handle_io_error(ret, q_item, flow_data); */
290        js_p->error_code = ret;
291        return SM_ACTION_COMPLETE;
292    }
293   
294    /* immediate return */
295    if(ret == 1) {
296        js_p->error_code = ret;
297        return SM_ACTION_COMPLETE;
298    }
299       
300    if(ret == 0) {
301        js_p->error_code = ret;
302        return SM_ACTION_DEFERRED;
303    }
304
305    return SM_ACTION_COMPLETE;
306}
307
308
309static PINT_sm_action check_align_fn(struct PINT_smcb *smcb, job_status_s *js_p)
310{
311    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
312    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
313    js_p->error_code = 0;
314    PVFS_size file_req_offset = parent_s_op->u.io.file_req_offset;
315    PINT_request_file_data fdata = parent_s_op->u.io.file_data;
316    PVFS_simple_stripe_params *dparam =
317        (PVFS_simple_stripe_params*)fdata.dist->params;
318    PVFS_size count;
319    PVFS_offset strip_boundary;
320
321    PVFS_offset loff = fdata.dist->methods->physical_to_logical_offset(fdata.dist->params, &fdata, s_op->u.pipeline.offsets[0]);
322   
323    s_op->u.pipeline.loff = loff;
324    gossip_debug(GOSSIP_IO_DEBUG, "loff=%lld, file_req_offset=%lld\n", lld(loff), lld(file_req_offset));
325
326    switch(parent_s_op->u.io.datatype) {
327    case ((int)0x4c000405): /* MPI_INT */
328        count = (PVFS_size)(s_op->u.pipeline.buffer_used -
329                            file_req_offset)/((*PVFS_INT).ub);
330        gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(count));
331        strip_boundary = ((int)(loff/(dparam->strip_size)))*(dparam->strip_size);
332        s_op->u.pipeline.unaligned_size = loff-strip_boundary;
333       
334        if (loff == strip_boundary && file_req_offset != 0) {
335            s_op->u.pipeline.unaligned_size = file_req_offset;
336        }
337
338        if (s_op->u.pipeline.unaligned_size != 0 && count != 0) {
339            js_p->error_code = UNALIGNED;
340            gossip_debug(GOSSIP_IO_DEBUG, "unaligned_size=%lld\n",
341                         lld(s_op->u.pipeline.unaligned_size));
342        }
343    case ((int)0x4c00080b): /* MPI_DOUBLE */
344        count = (PVFS_size)(s_op->u.pipeline.buffer_used -
345                            file_req_offset)/((*PVFS_DOUBLE).ub);
346        gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(count));
347        strip_boundary = ((int)(loff/(dparam->strip_size)))*(dparam->strip_size); /* FIXME */
348        s_op->u.pipeline.unaligned_size = loff-strip_boundary;
349
350        if (loff == strip_boundary && file_req_offset != 0) {
351            s_op->u.pipeline.unaligned_size = file_req_offset;
352        }
353       
354        if (s_op->u.pipeline.unaligned_size != 0 && count != 0) {
355            js_p->error_code = UNALIGNED;
356            gossip_debug(GOSSIP_IO_DEBUG, "unaligned_size=%lld\n",
357                         lld(s_op->u.pipeline.unaligned_size));
358        }
359    }
360
361    return SM_ACTION_COMPLETE;
362}
363
364
365static PINT_sm_action setup_s2s_msg(struct PINT_smcb *smcb, job_status_s *js_p)
366{
367    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
368    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
369    PINT_sm_msgpair_state *msg_p = NULL;
370    struct server_configuration_s *user_opts = get_server_config_struct();
371    int regions;
372    int ret;
373    PVFS_credentials creds;
374    int next_server_index;
375    PVFS_handle next_server_handle;
376
377    /* init msgpair */
378    PINT_msgpair_init(&s_op->msgarray_op);
379    msg_p = &s_op->msgarray_op.msgpair;
380
381    s_op->msgarray_op.params.job_timeout = user_opts->client_job_bmi_timeout;
382    s_op->msgarray_op.params.retry_delay = user_opts->client_retry_delay_ms;
383    s_op->msgarray_op.params.retry_limit = user_opts->client_retry_limit;
384    s_op->msgarray_op.params.quiet_flag = 1;
385
386    PINT_util_gen_credentials(&creds);
387    gossip_debug(GOSSIP_IO_DEBUG, "loff=%lld, buffer_used=%lld\n", lld(s_op->u.pipeline.loff), lld(s_op->u.pipeline.buffer_used));
388
389    /* determine which server we need to talk to */
390    next_server_index = (s_op->u.pipeline.dfile_index + 1)%(s_op->u.pipeline.dfile_count);
391    next_server_handle = parent_s_op->u.io.dfile_array[next_server_index];
392
393    /* build a request */
394    ret = PVFS_Request_contiguous(s_op->u.pipeline.unaligned_size,
395                                  PVFS_BYTE, &s_op->u.pipeline.file_req);
396   
397    s_op->u.pipeline.file_req_offset = (((int)(s_op->u.pipeline.loff/262144))+1)*262144; /* FIXME */
398   
399    regions = 1;
400    gossip_debug(GOSSIP_IO_DEBUG, "s_op->u.pipeline.file_req_offset=%lld\n", lld(s_op->u.pipeline.file_req_offset));
401
402    PINT_SERVREQ_SMALL_IO_FILL(msg_p->req,
403                               creds,
404                               s_op->u.pipeline.fs_id,
405                               next_server_handle,
406                               s_op->u.pipeline.io_type,
407                               next_server_index,
408                               s_op->u.pipeline.dfile_count,
409                               s_op->u.pipeline.dist,
410                               s_op->u.pipeline.file_req,
411                               s_op->u.pipeline.file_req_offset,
412                               regions,
413                               s_op->u.pipeline.unaligned_size,
414                               NULL /* s_op->hints */);
415
416    msg_p->fs_id = s_op->u.pipeline.fs_id;
417    msg_p->handle = next_server_handle;
418    msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
419    msg_p->comp_fn = s2s_comp_fn;
420
421    ret = PINT_cached_config_map_to_server(&msg_p->svr_addr,
422                                           next_server_handle, //s_op->u.pipeline.handle,
423                                           s_op->u.pipeline.fs_id);
424    gossip_debug(GOSSIP_IO_DEBUG, "%s: msg_p->svr_addr=%llu\n", __func__,
425                 llu(msg_p->svr_addr));
426    if(ret < 0) {
427        gossip_err("Failed to map meta server address\n");
428        js_p->error_code = ret;
429        return SM_ACTION_COMPLETE;
430    }
431
432    js_p->error_code = 0;
433
434    PINT_sm_push_frame(smcb, 0, &s_op->msgarray_op);
435    return SM_ACTION_COMPLETE;
436}
437
438static int s2s_comp_fn(void *v_p, struct PVFS_server_resp *resp_p,
439                        int index)
440{
441    PINT_smcb *smcb = v_p;
442    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
443
444    gossip_debug(GOSSIP_IO_DEBUG, "%s called\n", __func__);
445    gossip_debug(GOSSIP_IO_DEBUG, "resp_p->status=%d\n", resp_p->status);
446    gossip_debug(GOSSIP_IO_DEBUG, "small_io: result_size=%lld\n",
447                     lld(resp_p->u.small_io.result_size));
448
449    assert(resp_p->op == PVFS_SERV_SMALL_IO);
450
451    if (resp_p->status != 0) {
452        return resp_p->status;
453    }
454
455    if(resp_p->u.small_io.result_size != 0) {
456        memcpy(s_op->u.pipeline.tmp_buf, resp_p->u.small_io.buffer,
457               resp_p->u.small_io.result_size);
458    }
459
460    return 0;
461}
462
463/* square of Euclid distance between two multi-dimensional points            */
464__inline static
465float euclid_dist_2(int    numdims,  /* no. dimensions */
466                    float *coord1,   /* [numdims] */
467                    float *coord2)   /* [numdims] */
468{
469  int i;
470  float ans=0.0;
471
472  for (i=0; i<numdims; i++)
473    ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);
474
475  return(ans);
476}
477
478/*----< find_nearest_cluster() >---------------------------------------------*/
479__inline static
480int find_nearest_cluster(int     numClusters, /* no. clusters */
481                         int     numCoords,   /* no. coordinates */
482                         float  *object,      /* [numCoords] */
483                         float **clusters)    /* [numClusters][numCoords] */
484{
485  int   index, i;
486  float dist, min_dist;
487
488  /* find the cluster id that has min distance to object */
489  index    = 0;
490  min_dist = euclid_dist_2(numCoords, object, clusters[0]);
491
492  for (i=1; i<numClusters; i++) {
493    dist = euclid_dist_2(numCoords, object, clusters[i]);
494    /* no need square root */
495    if (dist < min_dist) { /* find the min and its array index */
496      min_dist = dist;
497      index    = i;
498    }
499  }
500  return(index);
501}
502
503static PINT_sm_action do_comp_fn(struct PINT_smcb *smcb, job_status_s *js_p)
504{
505    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
506    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
507    js_p->error_code = 0;
508
509    if(s_op->u.pipeline.buffer) {
510        PVFS_size i;
511        gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
512                     "%s: buffer_used=%lld, op=0x%x, datatype=0x%x, actual_size=%lld\n",
513                     __func__, lld(s_op->u.pipeline.buffer_used),
514                     parent_s_op->u.io.op,
515                     parent_s_op->u.io.datatype,
516                     lld(js_p->actual_size)); /* AS */
517
518        switch(parent_s_op->u.io.datatype) {
519        case ((int)0x4c000405): /* MPI_INT */
520            {
521                int *a = (int*)s_op->u.pipeline.buffer;
522                int result;
523                PVFS_size count = (s_op->u.pipeline.buffer_used-s_op->u.pipeline.unaligned_size)/((*PVFS_INT).ub);
524                int *tmp;
525
526                if(count == 0)
527                    return SM_ACTION_COMPLETE;
528                /* data is not aligned perfectly, so adjust it */
529                if(s_op->u.pipeline.unaligned_size != 0) {
530                    memcpy(((char*)&a[count])+(((*PVFS_INT).ub)-s_op->u.pipeline.unaligned_size), s_op->u.pipeline.tmp_buf, s_op->u.pipeline.unaligned_size);
531                    count++;
532                }
533               
534                if (parent_s_op->u.io.total_transferred == 0) {
535                    if (parent_s_op->u.io.tmp_buffer == NULL)
536                        parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(int));
537                    memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(int));
538                    parent_s_op->u.io.count = 0;
539                }
540                tmp = parent_s_op->u.io.tmp_buffer;
541                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%lld\n", lld(parent_s_op->u.io.total_transferred));
542                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%lld, tmp=%d\n", lld(count), *tmp);
543
544                switch(parent_s_op->u.io.op) {
545                case 0x58000001: /* MAX */
546                    result = *a;
547                    for (i=1; i<count; i++ ) {
548                        if (a[i] > result) {
549                            result = a[i];
550                        }
551                    }
552                    a[0] = result;
553                    if (parent_s_op->u.io.total_transferred == 0 ||
554                        result > *tmp)
555                        *tmp = result;
556                    break;
557                case 0x58000002: /* MIN */
558                    result = *a;
559                    for (i=1; i<count; i++ ) {
560                        if (a[i] < result) {
561                            result = a[i];
562                        }
563                    }
564                    a[0] = result;
565                    if (parent_s_op->u.io.total_transferred == 0 ||
566                        result < *tmp)
567                        *tmp = result;
568                    break;
569                case 0x58000003: /* SUM */
570                    result = 0;
571                    for (i=0; i<count; i++ ) {
572                        if (i<10)
573                            gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%lld]=%d\n",
574                                         lld(i), a[i]);
575                        result += a[i];
576                    }
577                    a[0] = result;
578                    *tmp += result;
579                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%d\n",
580                                 *tmp);
581                    break;
582                case (0x5800000e): /* MEAN */
583                    result = 0;
584                    for (i=0; i<count; i++) {
585                        result += a[i];
586                    }
587                    result = result/count;
588                    a[0] = result;
589                   
590                    if (parent_s_op->u.io.count == 0)
591                        *tmp = result;
592                    else {
593                        int tmp_sum = (*tmp)*(parent_s_op->u.io.count);
594                        tmp_sum = tmp_sum + result*count;
595                        *tmp = (tmp_sum)/(parent_s_op->u.io.count+count);
596                    }
597                    parent_s_op->u.io.count += count;
598                    gossip_debug(GOSSIP_IO_DEBUG, "mean=%d\n", *tmp);
599                    break;
600                default:
601                    break;
602                }
603                s_op->u.pipeline.buffer = (void *)a;
604                parent_s_op->u.io.tmp_buffer = (void *)tmp;
605            }
606            break;
607
608        case ((int)0x4c00080b): /* MPI_DOUBLE */
609            {
610                double *a = (double*)s_op->u.pipeline.buffer;
611                double result;
612                PVFS_size count = (s_op->u.pipeline.buffer_used-s_op->u.pipeline.unaligned_size)/((*PVFS_DOUBLE).ub);
613                double *tmp;
614                PVFS_offset strip_boundary = (int)(s_op->u.pipeline.loff/262144)*262144; /* FIXME */
615
616                if (s_op->u.pipeline.buffer_used < 262144 &&
617                    s_op->u.pipeline.loff != strip_boundary) { /* FIXME */
618                    count = (s_op->u.pipeline.buffer_used)/((*PVFS_DOUBLE).ub);
619                }
620                gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(count));
621                if(count < 1)
622                    return SM_ACTION_COMPLETE;
623
624                /* data is not aligned perfectly, so adjust it within the memory*/
625                if(s_op->u.pipeline.unaligned_size != 0) {
626                    PVFS_size adj_sz = s_op->u.pipeline.unaligned_size;
627                    PVFS_size tmp_sz = ((*PVFS_DOUBLE).ub) - adj_sz;
628                   
629                    if(s_op->u.pipeline.loff == strip_boundary) {
630                        memcpy(a, (char*)&a[0]+adj_sz,
631                               s_op->u.pipeline.buffer_used-adj_sz);
632                    }
633
634                    memcpy(((char*)&a[count])+tmp_sz, s_op->u.pipeline.tmp_buf,
635                           s_op->u.pipeline.unaligned_size);
636
637
638                    count++;
639                   
640                    if(s_op->u.pipeline.buffer_used < (262144-((*PVFS_DOUBLE).ub)))
641                        count--;
642                }
643
644                if (parent_s_op->u.io.total_transferred == 0) {
645                    if (parent_s_op->u.io.tmp_buffer == NULL)
646                        parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(double));
647                    memset( parent_s_op->u.io.tmp_buffer, 0, sizeof(double));
648                    parent_s_op->u.io.count = 0;
649                }
650                tmp = parent_s_op->u.io.tmp_buffer;
651                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
652                             "total_transferred=%lld\n",
653                             lld(parent_s_op->u.io.total_transferred));
654                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%lld, tmp=%lf\n",
655                             lld(count), *tmp);
656                switch(parent_s_op->u.io.op) {
657                case 0x58000001: /* MAX */
658                    result = *a;
659                    for (i=1; i<count; i++ ) {
660                        if (a[i] > result) {
661                            result = a[i];
662                        }
663                    }
664                    a[0] = result;
665                    if (parent_s_op->u.io.total_transferred == 0 ||
666                        result > *tmp)
667                        *tmp = result;
668                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "max=%lf\n", *tmp);
669                    break;
670                case 0x58000002: /* MIN */
671                    result = *a;
672                    for (i=1; i<count; i++ ) {
673                        if (a[i] < result) {
674                            result = a[i];
675                        }
676                    }
677
678                    a[0] = result;
679                    if (parent_s_op->u.io.total_transferred == 0 ||
680                        result < *tmp)
681                        *tmp = result;
682                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "min=%lf\n",
683                                 *tmp);
684                    break;
685                case 0x58000003: /* SUM */
686                    result = 0;
687                    for (i=0; i<count; i++ ) {
688                        if (i<10 || i== (count-1) || i == count) gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%lld]=%lf\n", lld(i), a[i]);
689                        result += a[i];
690                    }
691
692                    a[0] = result;
693                    *tmp += result;
694                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%lf\n",
695                                 *tmp);
696                    break;   
697                case (0x5800000e): /* MEAN */
698                    result = 0;
699                    for (i=0; i<count; i++) {
700                        result += a[i];
701                    }
702                    result = result/count;
703                    a[0] = result;
704
705                    gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(parent_s_op->u.io.count));
706
707                    if (parent_s_op->u.io.count == 0)
708                        *tmp = result;
709                    else {
710                        double tmp_sum = (*tmp)*(parent_s_op->u.io.count);
711                        tmp_sum = tmp_sum + result*count;
712                        *tmp = (tmp_sum)/(parent_s_op->u.io.count+count);
713                    }
714                    parent_s_op->u.io.count += count;
715                    gossip_debug(GOSSIP_IO_DEBUG, "mean=%lf\n", result);
716                    break;
717                case (0x5800000f): /* KMEANS */ {
718                    int index, numObjs, j;
719                    int numClusters=2, numCoords=4;
720                    double delta;
721                   
722#if 0
723                    for(i=0; i<numObjs; i++) {
724                        //result = a[i];
725                        index = find_nearest_cluster(numClusters, numCoords, a[i], clusters);
726                        if(membership[i] != index) delta += 1.0;
727                        membership[i] = index;
728
729                        newClusterSize[index]++;
730                        for(j=0; j<numCoords; j++)
731                            newClusters[index][j] += objects[i][j];
732                    }
733#endif
734                    js_p->error_code = DO_ALLREDUCE;
735                    break;
736                }
737                default:
738                    break;
739                } /* end inner switch */
740                s_op->u.pipeline.buffer = (void *)a;
741                parent_s_op->u.io.tmp_buffer = (void *)tmp;
742            }
743           
744            break;
745        default:
746            break;
747        } /* end switch() */
748    } /* end if() */
749
750    return SM_ACTION_COMPLETE;
751}
752
753static PINT_sm_action setup_allreduce_sm(struct PINT_smcb *smcb, job_status_s *js_p)
754{
755    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
756    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
757    int ret, tmp_id;
758    struct server_configuration_s *user_opts = get_server_config_struct();
759    PVFS_handle new_rank_handle;
760    PVFS_BMI_addr_t svr_addr;
761    struct PINT_server_op *allreduce_op;
762    int i;
763
764    allreduce_op = malloc(sizeof(*allreduce_op));
765    memset(allreduce_op, 0, sizeof(*allreduce_op));
766    allreduce_op->u.allreduce.fs_id = s_op->u.pipeline.fs_id;
767    allreduce_op->u.allreduce.myRank = s_op->u.pipeline.dfile_index;
768    allreduce_op->u.allreduce.recv_buf = (void*)malloc(5*sizeof(double));
769    memset(allreduce_op->u.allreduce.recv_buf, 0, 5*sizeof(double));
770    allreduce_op->u.allreduce.tree_depth = log2(s_op->u.pipeline.dfile_count); /* FIXME !!!! */
771    gossip_debug(GOSSIP_IO_DEBUG, "tree_depth=%d\n",
772                 allreduce_op->u.allreduce.tree_depth);
773    allreduce_op->u.allreduce.current_depth = 0;
774    allreduce_op->u.allreduce.mask = 0x1;
775    allreduce_op->u.allreduce.dfile_array = parent_s_op->u.io.dfile_array;
776    allreduce_op->u.allreduce.send_buf = (void*)malloc(5*sizeof(double));
777    memset(allreduce_op->u.allreduce.send_buf, 0, 5*sizeof(double));
778
779    ret = PINT_sm_push_frame(smcb, 0, allreduce_op);
780   
781    js_p->error_code = 0;
782    return SM_ACTION_COMPLETE;
783}
784
785static PINT_sm_action cleanup_allreduce_fn(struct PINT_smcb *smcb, job_status_s *js_p)
786{
787    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
788    int task_id;
789    int remaining;
790    PVFS_error tmp_err;
791    struct PINT_server_op *allreduce_op;
792
793    allreduce_op = PINT_sm_pop_frame(smcb, &task_id, &tmp_err,
794            &remaining);
795    gossip_debug(GOSSIP_SERVER_DEBUG,
796                 "pipeline: nested sm returned error code: %d\n", tmp_err);
797    memcpy(s_op->u.pipeline.buffer, allreduce_op->u.allreduce.send_buf,
798           5*sizeof(double));
799    free(allreduce_op->u.allreduce.send_buf);
800    free(allreduce_op->u.allreduce.recv_buf);
801    free(allreduce_op);
802
803    js_p->error_code = 0;
804    return SM_ACTION_COMPLETE;
805}
806
807static PINT_sm_action check_pipeline_done(struct PINT_smcb *smcb, job_status_s *js_p)
808{
809    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
810    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
811    PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
812    js_p->error_code = 0;
813
814    /* FIMXE: do we really need this lock? */
815    gen_mutex_lock(&parent_s_op->u.io.mutex);
816    //parent_s_op->u.io.total_transferred += js_p->actual_size;
817    parent_s_op->u.io.total_transferred += s_op->u.pipeline.buffer_used;
818    gen_mutex_unlock(&parent_s_op->u.io.mutex);
819   
820    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%lld\n", __func__,
821                 lld(parent_s_op->u.io.total_transferred));
822    gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%lld\n", __func__,
823                 lld(js_p->actual_size));
824
825    /* FIXME: */
826    /* unless the second condition is set, the server starts
827       a new read request to one already done, and falls into
828       infinite trove_read->bmi_send->check_done loop */
829    if(!segpool_done(h) && s_op->u.pipeline.segs != 0) {
830        gossip_debug(GOSSIP_IO_DEBUG, "%s: LOOP\n", __func__);
831        js_p->error_code = LOOP;
832    }
833    else {
834        if(parent_s_op->u.io.op == (0x5800000f)) { /* KMEANS */
835            parent_s_op->u.io.tmp_buffer = (void *)s_op->u.pipeline.buffer;
836        }
837        gossip_debug(GOSSIP_IO_DEBUG, "%s: DONE\n", __func__);
838    }
839   
840    return SM_ACTION_COMPLETE;
841}
842
843/*
844 * Local variables:
845 *  mode: c
846 *  c-indent-level: 4
847 *  c-basic-offset: 4
848 * End:
849 *
850 * vim: ft=c ts=8 sts=4 sw=4 expandtab
851 */
Note: See TracBrowser for help on using the browser.