root/branches/as-branch/src/server/pipeline.sm @ 7826

Revision 7826, 24.7 KB (checked in by sson, 4 years ago)

Used PINT_sm_frame(smcb->parent_smcb, 0) to refer parent smcbs in pipeline.sm.
Removed possible double free() in io.sm.

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/*
8 *  PVFS2 server state machine for driving read/write I/O operations.
9 */
10#define _ISOC99_SOURCE /* log2() */
11
12#include <string.h>
13#include <assert.h>
14#include <stdlib.h>
15#include <math.h> /* log2() */
16
17#include "server-config.h"
18#include "pvfs2-server.h"
19#include "pvfs2-attr.h"
20#include "pvfs2-request.h"
21#include "pint-distribution.h"
22#include "pvfs2-dist-simple-stripe.h"
23#include "pint-request.h"
24#include "pvfs2-internal.h"
25#include "trove.h"
26#include "pint-util.h"
27#include "pint-cached-config.h"
28
29#define LOOP 101
30#define UNALIGNED 102
31#define DO_COMP 103
32#define DO_ALLREDUCE 104
33#define CONTINUE_ALLREDUCE 105
34
35static int s2s_comp_fn(
36    void *v_p, struct PVFS_server_resp *resp_p, int index);
37
38%%
39
40nested machine pvfs2_pipeline_sm
41{
42    state fetch
43    {
44        run fetch_data;
45        success => dispatch;
46    }
47
48    state dispatch
49    {
50        run dispatch_data;
51        DO_COMP => check_align;
52        success => check_pipeline;
53    }
54
55    state check_align
56    {
57        run check_align_fn;
58        UNALIGNED => setup_s2s;
59        success => do_comp;
60    }
61
62    state setup_s2s
63    {
64        run setup_s2s_msg;
65        success => s2s_exchange;
66    }
67
68    state s2s_exchange
69    {
70        jump pvfs2_msgpairarray_sm;
71        success => do_comp;
72    }
73
74    state do_comp
75    {
76        run do_comp_fn;
77        DO_ALLREDUCE => setup_allreduce;
78        success => check_pipeline;
79    }
80
81    state setup_allreduce
82    {
83        pjmp setup_allreduce_sm
84        {
85            success => pvfs2_allreduce_sm;
86        }
87        success => cleanup_allreduce;
88    }
89
90    state cleanup_allreduce
91    {
92        run cleanup_allreduce_fn;
93        success => check_pipeline;
94    }
95
96    state check_pipeline
97    {
98        run check_pipeline_done;
99        LOOP => fetch;
100        success => return;
101    }
102}
103
104%%
105
106/*
107 * fetch data from either TROVE (in case of READ) or BMI (in case of WRITE)
108 *
109 *   PINT_segpool_take_segments()
110 *     => READ: job_trove_bstream_read_list()
111 *     => WRITE: job_bmi_recv()
112 */
113static PINT_sm_action fetch_data(struct PINT_smcb *smcb, job_status_s *js_p)
114{
115    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
116    PINT_segpool_handle_t seg_handle = s_op->u.pipeline.seg_handle;
117    PINT_segpool_unit_id id = s_op->u.pipeline.id;
118    struct server_configuration_s *user_opts = get_server_config_struct();
119    int count, ret, i;
120    PVFS_offset *offsets;
121    PVFS_size *sizes;
122    PVFS_size bytes;
123    job_id_t tmp_id;
124
125    gossip_debug(GOSSIP_IO_DEBUG, "smcb->base_frame=%d, frame_count=%d\n", smcb->base_frame, smcb->frame_count);
126    s_op->u.pipeline.buffer_used = 0;
127    bytes = s_op->u.pipeline.buffer_size;
128
129    PINT_segpool_take_segments(seg_handle, id, &bytes, &count,
130                               &offsets, &sizes);
131    gossip_debug(GOSSIP_IO_DEBUG, "%s: %s: bytes=%lld, count=%d\n",
132                 __func__,
133                 (s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
134                 lld(bytes), count);
135
136    for(i=0; i<count; i++) {
137        gossip_debug(GOSSIP_IO_DEBUG, "offsets[%d]=%lld, sizes[%d]=%lld\n",
138                     i, lld(offsets[i]), i, lld(sizes[i]));
139    }
140
141    if(count == 0) {
142        js_p->error_code = 0;
143        //gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
144        return SM_ACTION_COMPLETE;
145    }
146
147    s_op->u.pipeline.buffer_used = bytes;
148    s_op->u.pipeline.offsets = offsets;
149    s_op->u.pipeline.sizes = sizes;
150    s_op->u.pipeline.segs = count;
151
152    if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
153       
154        ret = job_trove_bstream_read_list
155            (s_op->u.pipeline.fs_id,
156             s_op->u.pipeline.handle,
157             (char **)&s_op->u.pipeline.buffer,
158             (PVFS_size *)&s_op->u.pipeline.buffer_used,
159             1,
160             offsets,
161             sizes,
162             count,
163             &s_op->u.pipeline.out_size,
164             s_op->u.pipeline.trove_sync_flag,
165             NULL,
166             smcb,
167             0,
168             js_p,
169             &tmp_id,
170             server_job_context,
171             s_op->u.pipeline.hints);
172    }
173    else if (s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
174        ret = job_bmi_recv(s_op->u.pipeline.address,
175                       (void *)s_op->u.pipeline.buffer,
176                       s_op->u.pipeline.buffer_size,
177                       s_op->u.pipeline.tag,
178                       BMI_PRE_ALLOC,
179                       smcb,
180                       0, /* unsigned long status_user_tag = 0 */
181                       js_p,
182                       &tmp_id,
183                       server_job_context,
184                       user_opts->server_job_flow_timeout,
185                       (bmi_hint)s_op->u.pipeline.hints);
186    }
187
188    if(ret < 0) {
189        gossip_err("%s: I/O error occurred\n", __func__);
190        /* FIXME */
191        //handle_io_error(ret, q_item, flow_data);
192        js_p->error_code = -PVFS_EIO;
193        return SM_ACTION_COMPLETE;
194    }
195
196    /* immediate return */
197    if(ret == 1) {
198        js_p->error_code = 1;
199        return SM_ACTION_COMPLETE;
200    }
201
202    if(ret == 0) {
203        js_p->error_code = 0;
204        return SM_ACTION_DEFERRED;
205    }
206
207    return SM_ACTION_COMPLETE;
208}
209
210/*
211 * Dispatch data to either BMI (in case of READ) or TROVE (in case of WRITE)
212 *
213 *   => READ: job_bmi_send()
214 *   => WRITE: job_trove_bstream_write_list()
215 */
216static PINT_sm_action dispatch_data(struct PINT_smcb *smcb, job_status_s *js_p)
217{
218    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
219    int ret;
220    job_id_t tmp_id;
221    struct server_configuration_s *user_opts = get_server_config_struct();
222    struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);
223
224    if(s_op->u.pipeline.segs == 0) {
225        js_p->error_code = 0;
226        gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
227        return SM_ACTION_COMPLETE;
228    }
229
230    gossip_debug(GOSSIP_IO_DEBUG, "%s: %s: buffer_used=%lld\n", __func__,
231                 (s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
232                 lld(s_op->u.pipeline.buffer_used));
233#if 0
234    gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
235                 (char *)s_op->u.pipeline.buffer);
236#endif
237
238   
239    if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
240        assert(s_op->u.pipeline.buffer_used);
241
242        if(parent_s_op->u.io.op != 0) { /* AS: when op is specified */
243            ret = DO_COMP; /* AS: skip sending if op is specified */
244            gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
245            js_p->error_code = ret;
246            return SM_ACTION_COMPLETE;
247        }
248        else
249            ret = job_bmi_send(s_op->u.pipeline.address,
250                               s_op->u.pipeline.buffer,
251                               js_p->actual_size,
252                               s_op->u.pipeline.tag,
253                               BMI_PRE_ALLOC,
254                               0, /* send_unexpected */
255                               smcb, /* user_ptr */
256                               0, /* status_user_tag */
257                               js_p,
258                               &tmp_id,
259                               server_job_context,
260                               user_opts->server_job_bmi_timeout,
261                               (bmi_hint)s_op->u.pipeline.hints);
262       
263    }
264    else if(s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
265        ret = job_trove_bstream_write_list
266            (s_op->u.pipeline.fs_id,
267             s_op->u.pipeline.handle,
268             (char **)&s_op->u.pipeline.buffer,
269             (TROVE_size *)&js_p->actual_size,
270             1,
271             s_op->u.pipeline.offsets,
272             s_op->u.pipeline.sizes,
273             s_op->u.pipeline.segs,
274             &s_op->u.pipeline.out_size,
275             s_op->u.pipeline.trove_sync_flag,
276             NULL,
277             smcb,
278             0,
279             js_p,
280             &tmp_id,
281             server_job_context,
282             s_op->u.pipeline.hints);
283    }
284
285    if(ret < 0) {
286        gossip_err("%s: I/O error occurred\n", __func__);
287        /* FIXME !!!!!!! */
288        /* handle_io_error(ret, q_item, flow_data); */
289        js_p->error_code = ret;
290        return SM_ACTION_COMPLETE;
291    }
292   
293    /* immediate return */
294    if(ret == 1) {
295        js_p->error_code = ret;
296        return SM_ACTION_COMPLETE;
297    }
298       
299    if(ret == 0) {
300        js_p->error_code = ret;
301        return SM_ACTION_DEFERRED;
302    }
303
304    return SM_ACTION_COMPLETE;
305}
306
307
308static PINT_sm_action check_align_fn(struct PINT_smcb *smcb, job_status_s *js_p)
309{
310    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
311    struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);
312    js_p->error_code = 0;
313    PVFS_size file_req_offset = parent_s_op->u.io.file_req_offset;
314    PINT_request_file_data fdata = parent_s_op->u.io.file_data;
315    PVFS_simple_stripe_params *dparam =
316        (PVFS_simple_stripe_params*)fdata.dist->params;
317    PVFS_size count;
318    PVFS_offset strip_boundary;
319
320    PVFS_offset loff = fdata.dist->methods->physical_to_logical_offset(fdata.dist->params, &fdata, s_op->u.pipeline.offsets[0]);
321   
322    s_op->u.pipeline.loff = loff;
323    gossip_debug(GOSSIP_IO_DEBUG, "loff=%lld, file_req_offset=%lld\n", lld(loff), lld(file_req_offset));
324
325    switch(parent_s_op->u.io.datatype) {
326    case ((int)0x4c000405): /* MPI_INT */
327        count = (PVFS_size)(s_op->u.pipeline.buffer_used -
328                            file_req_offset)/((*PVFS_INT).ub);
329        gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(count));
330        strip_boundary = ((int)(loff/(dparam->strip_size)))*(dparam->strip_size);
331        s_op->u.pipeline.unaligned_size = loff-strip_boundary;
332       
333        if (loff == strip_boundary && file_req_offset != 0) {
334            s_op->u.pipeline.unaligned_size = file_req_offset;
335        }
336
337        if (s_op->u.pipeline.unaligned_size != 0 && count != 0) {
338            js_p->error_code = UNALIGNED;
339            gossip_debug(GOSSIP_IO_DEBUG, "unaligned_size=%lld\n",
340                         lld(s_op->u.pipeline.unaligned_size));
341        }
342    case ((int)0x4c00080b): /* MPI_DOUBLE */
343        count = (PVFS_size)(s_op->u.pipeline.buffer_used -
344                            file_req_offset)/((*PVFS_DOUBLE).ub);
345        gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(count));
346        strip_boundary = ((int)(loff/(dparam->strip_size)))*(dparam->strip_size); /* FIXME */
347        s_op->u.pipeline.unaligned_size = loff-strip_boundary;
348
349        if (loff == strip_boundary && file_req_offset != 0) {
350            s_op->u.pipeline.unaligned_size = file_req_offset;
351        }
352       
353        if (s_op->u.pipeline.unaligned_size != 0 && count != 0) {
354            js_p->error_code = UNALIGNED;
355            gossip_debug(GOSSIP_IO_DEBUG, "unaligned_size=%lld\n",
356                         lld(s_op->u.pipeline.unaligned_size));
357        }
358    }
359
360    return SM_ACTION_COMPLETE;
361}
362
363
364static PINT_sm_action setup_s2s_msg(struct PINT_smcb *smcb, job_status_s *js_p)
365{
366    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
367    struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);
368    PINT_sm_msgpair_state *msg_p = NULL;
369    struct server_configuration_s *user_opts = get_server_config_struct();
370    int regions;
371    int ret;
372    PVFS_credentials creds;
373    int next_server_index;
374    PVFS_handle next_server_handle;
375
376    /* init msgpair */
377    PINT_msgpair_init(&s_op->msgarray_op);
378    msg_p = &s_op->msgarray_op.msgpair;
379
380    s_op->msgarray_op.params.job_timeout = user_opts->client_job_bmi_timeout;
381    s_op->msgarray_op.params.retry_delay = user_opts->client_retry_delay_ms;
382    s_op->msgarray_op.params.retry_limit = user_opts->client_retry_limit;
383    s_op->msgarray_op.params.quiet_flag = 1;
384
385    PINT_util_gen_credentials(&creds);
386    gossip_debug(GOSSIP_IO_DEBUG, "loff=%lld, buffer_used=%lld\n", lld(s_op->u.pipeline.loff), lld(s_op->u.pipeline.buffer_used));
387
388    /* determine which server we need to talk to */
389    next_server_index = (s_op->u.pipeline.dfile_index + 1)%(s_op->u.pipeline.dfile_count);
390    next_server_handle = parent_s_op->u.io.dfile_array[next_server_index];
391
392    /* build a request */
393    ret = PVFS_Request_contiguous(s_op->u.pipeline.unaligned_size,
394                                  PVFS_BYTE, &s_op->u.pipeline.file_req);
395   
396    s_op->u.pipeline.file_req_offset = (((int)(s_op->u.pipeline.loff/262144))+1)*262144; /* FIXME */
397   
398    regions = 1;
399    gossip_debug(GOSSIP_IO_DEBUG, "s_op->u.pipeline.file_req_offset=%lld\n", lld(s_op->u.pipeline.file_req_offset));
400
401    PINT_SERVREQ_SMALL_IO_FILL(msg_p->req,
402                               creds,
403                               s_op->u.pipeline.fs_id,
404                               next_server_handle,
405                               s_op->u.pipeline.io_type,
406                               next_server_index,
407                               s_op->u.pipeline.dfile_count,
408                               s_op->u.pipeline.dist,
409                               s_op->u.pipeline.file_req,
410                               s_op->u.pipeline.file_req_offset,
411                               regions,
412                               s_op->u.pipeline.unaligned_size,
413                               NULL /* s_op->hints */);
414
415    msg_p->fs_id = s_op->u.pipeline.fs_id;
416    msg_p->handle = next_server_handle;
417    msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
418    msg_p->comp_fn = s2s_comp_fn;
419
420    ret = PINT_cached_config_map_to_server(&msg_p->svr_addr,
421                                           next_server_handle, //s_op->u.pipeline.handle,
422                                           s_op->u.pipeline.fs_id);
423    gossip_debug(GOSSIP_IO_DEBUG, "%s: msg_p->svr_addr=%llu\n", __func__,
424                 llu(msg_p->svr_addr));
425    if(ret < 0) {
426        gossip_err("Failed to map meta server address\n");
427        js_p->error_code = ret;
428        return SM_ACTION_COMPLETE;
429    }
430
431    js_p->error_code = 0;
432
433    PINT_sm_push_frame(smcb, 0, &s_op->msgarray_op);
434    return SM_ACTION_COMPLETE;
435}
436
437static int s2s_comp_fn(void *v_p, struct PVFS_server_resp *resp_p,
438                        int index)
439{
440    PINT_smcb *smcb = v_p;
441    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
442
443    gossip_debug(GOSSIP_IO_DEBUG, "%s called\n", __func__);
444    gossip_debug(GOSSIP_IO_DEBUG, "resp_p->status=%d\n", resp_p->status);
445    gossip_debug(GOSSIP_IO_DEBUG, "small_io: result_size=%lld\n",
446                     lld(resp_p->u.small_io.result_size));
447
448    assert(resp_p->op == PVFS_SERV_SMALL_IO);
449
450    if (resp_p->status != 0) {
451        return resp_p->status;
452    }
453
454    if(resp_p->u.small_io.result_size != 0) {
455        memcpy(s_op->u.pipeline.tmp_buf, resp_p->u.small_io.buffer,
456               resp_p->u.small_io.result_size);
457    }
458
459    return 0;
460}
461
462/* square of Euclid distance between two multi-dimensional points            */
463__inline static
464float euclid_dist_2(int    numdims,  /* no. dimensions */
465                    float *coord1,   /* [numdims] */
466                    float *coord2)   /* [numdims] */
467{
468  int i;
469  float ans=0.0;
470
471  for (i=0; i<numdims; i++)
472    ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);
473
474  return(ans);
475}
476
477/*----< find_nearest_cluster() >---------------------------------------------*/
478__inline static
479int find_nearest_cluster(int     numClusters, /* no. clusters */
480                         int     numCoords,   /* no. coordinates */
481                         float  *object,      /* [numCoords] */
482                         float **clusters)    /* [numClusters][numCoords] */
483{
484  int   index, i;
485  float dist, min_dist;
486
487  /* find the cluster id that has min distance to object */
488  index    = 0;
489  min_dist = euclid_dist_2(numCoords, object, clusters[0]);
490
491  for (i=1; i<numClusters; i++) {
492    dist = euclid_dist_2(numCoords, object, clusters[i]);
493    /* no need square root */
494    if (dist < min_dist) { /* find the min and its array index */
495      min_dist = dist;
496      index    = i;
497    }
498  }
499  return(index);
500}
501
502static PINT_sm_action do_comp_fn(struct PINT_smcb *smcb, job_status_s *js_p)
503{
504    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
505    struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);
506    js_p->error_code = 0;
507
508    if(s_op->u.pipeline.buffer) {
509        PVFS_size i;
510        gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
511                     "%s: buffer_used=%lld, op=0x%x, datatype=0x%x, actual_size=%lld\n",
512                     __func__, lld(s_op->u.pipeline.buffer_used),
513                     parent_s_op->u.io.op,
514                     parent_s_op->u.io.datatype,
515                     lld(js_p->actual_size)); /* AS */
516
517        switch(parent_s_op->u.io.datatype) {
518        case ((int)0x4c000405): /* MPI_INT */
519            {
520                int *a = (int*)s_op->u.pipeline.buffer;
521                int result;
522                PVFS_size count = (s_op->u.pipeline.buffer_used-s_op->u.pipeline.unaligned_size)/((*PVFS_INT).ub);
523                int *tmp;
524
525                if(count == 0)
526                    return SM_ACTION_COMPLETE;
527                /* data is not aligned perfectly, so adjust it */
528                if(s_op->u.pipeline.unaligned_size != 0) {
529                    memcpy(((char*)&a[count])+(((*PVFS_INT).ub)-s_op->u.pipeline.unaligned_size), s_op->u.pipeline.tmp_buf, s_op->u.pipeline.unaligned_size);
530                    count++;
531                }
532               
533                if (parent_s_op->u.io.total_transferred == 0) {
534                    if (parent_s_op->u.io.tmp_buffer == NULL)
535                        parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(int));
536                    memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(int));
537                    parent_s_op->u.io.count = 0;
538                }
539                tmp = parent_s_op->u.io.tmp_buffer;
540                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%lld\n", lld(parent_s_op->u.io.total_transferred));
541                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%lld, tmp=%d\n", lld(count), *tmp);
542
543                switch(parent_s_op->u.io.op) {
544                case 0x58000001: /* MAX */
545                    result = *a;
546                    for (i=1; i<count; i++ ) {
547                        if (a[i] > result) {
548                            result = a[i];
549                        }
550                    }
551                    a[0] = result;
552                    if (parent_s_op->u.io.total_transferred == 0 ||
553                        result > *tmp)
554                        *tmp = result;
555                    break;
556                case 0x58000002: /* MIN */
557                    result = *a;
558                    for (i=1; i<count; i++ ) {
559                        if (a[i] < result) {
560                            result = a[i];
561                        }
562                    }
563                    a[0] = result;
564                    if (parent_s_op->u.io.total_transferred == 0 ||
565                        result < *tmp)
566                        *tmp = result;
567                    break;
568                case 0x58000003: /* SUM */
569                    result = 0;
570                    for (i=0; i<count; i++ ) {
571                        if (i<10)
572                            gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%lld]=%d\n",
573                                         lld(i), a[i]);
574                        result += a[i];
575                    }
576                    a[0] = result;
577                    *tmp += result;
578                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%d\n",
579                                 *tmp);
580                    break;
581                case (0x5800000e): /* MEAN */
582                    result = 0;
583                    for (i=0; i<count; i++) {
584                        result += a[i];
585                    }
586                    result = result/count;
587                    a[0] = result;
588                   
589                    if (parent_s_op->u.io.count == 0)
590                        *tmp = result;
591                    else {
592                        int tmp_sum = (*tmp)*(parent_s_op->u.io.count);
593                        tmp_sum = tmp_sum + result*count;
594                        *tmp = (tmp_sum)/(parent_s_op->u.io.count+count);
595                    }
596                    parent_s_op->u.io.count += count;
597                    gossip_debug(GOSSIP_IO_DEBUG, "mean=%d\n", *tmp);
598                    break;
599                default:
600                    break;
601                }
602                s_op->u.pipeline.buffer = (void *)a;
603                parent_s_op->u.io.tmp_buffer = (void *)tmp;
604            }
605            break;
606
607        case ((int)0x4c00080b): /* MPI_DOUBLE */
608            {
609                double *a = (double*)s_op->u.pipeline.buffer;
610                double result;
611                PVFS_size count = (s_op->u.pipeline.buffer_used-s_op->u.pipeline.unaligned_size)/((*PVFS_DOUBLE).ub);
612                double *tmp;
613                PVFS_offset strip_boundary = (int)(s_op->u.pipeline.loff/262144)*262144; /* FIXME */
614
615                if (s_op->u.pipeline.buffer_used < 262144 &&
616                    s_op->u.pipeline.loff != strip_boundary) { /* FIXME */
617                    count = (s_op->u.pipeline.buffer_used)/((*PVFS_DOUBLE).ub);
618                }
619                gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(count));
620                if(count < 1)
621                    return SM_ACTION_COMPLETE;
622
623                /* data is not aligned perfectly, so adjust it within the memory*/
624                if(s_op->u.pipeline.unaligned_size != 0) {
625                    PVFS_size adj_sz = s_op->u.pipeline.unaligned_size;
626                    PVFS_size tmp_sz = ((*PVFS_DOUBLE).ub) - adj_sz;
627                   
628                    if(s_op->u.pipeline.loff == strip_boundary) {
629                        memcpy(a, (char*)&a[0]+adj_sz,
630                               s_op->u.pipeline.buffer_used-adj_sz);
631                    }
632
633                    memcpy(((char*)&a[count])+tmp_sz, s_op->u.pipeline.tmp_buf,
634                           s_op->u.pipeline.unaligned_size);
635
636
637                    count++;
638                   
639                    if(s_op->u.pipeline.buffer_used < (262144-((*PVFS_DOUBLE).ub)))
640                        count--;
641                }
642
643                if (parent_s_op->u.io.total_transferred == 0) {
644                    if (parent_s_op->u.io.tmp_buffer == NULL)
645                        parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(double));
646                    memset( parent_s_op->u.io.tmp_buffer, 0, sizeof(double));
647                    parent_s_op->u.io.count = 0;
648                }
649                tmp = parent_s_op->u.io.tmp_buffer;
650                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
651                             "total_transferred=%lld\n",
652                             lld(parent_s_op->u.io.total_transferred));
653                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%lld, tmp=%lf\n",
654                             lld(count), *tmp);
655                switch(parent_s_op->u.io.op) {
656                case 0x58000001: /* MAX */
657                    result = *a;
658                    for (i=1; i<count; i++ ) {
659                        if (a[i] > result) {
660                            result = a[i];
661                        }
662                    }
663                    a[0] = result;
664                    if (parent_s_op->u.io.total_transferred == 0 ||
665                        result > *tmp)
666                        *tmp = result;
667                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "max=%lf\n", *tmp);
668                    break;
669                case 0x58000002: /* MIN */
670                    result = *a;
671                    for (i=1; i<count; i++ ) {
672                        if (a[i] < result) {
673                            result = a[i];
674                        }
675                    }
676
677                    a[0] = result;
678                    if (parent_s_op->u.io.total_transferred == 0 ||
679                        result < *tmp)
680                        *tmp = result;
681                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "min=%lf\n",
682                                 *tmp);
683                    break;
684                case 0x58000003: /* SUM */
685                    result = 0;
686                    for (i=0; i<count; i++ ) {
687                        if (i<10 || i== (count-1) || i == count) gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%lld]=%lf\n", lld(i), a[i]);
688                        result += a[i];
689                    }
690
691                    a[0] = result;
692                    *tmp += result;
693                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%lf\n",
694                                 *tmp);
695                    break;   
696                case (0x5800000e): /* MEAN */
697                    result = 0;
698                    for (i=0; i<count; i++) {
699                        result += a[i];
700                    }
701                    result = result/count;
702                    a[0] = result;
703
704                    gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(parent_s_op->u.io.count));
705
706                    if (parent_s_op->u.io.count == 0)
707                        *tmp = result;
708                    else {
709                        double tmp_sum = (*tmp)*(parent_s_op->u.io.count);
710                        tmp_sum = tmp_sum + result*count;
711                        *tmp = (tmp_sum)/(parent_s_op->u.io.count+count);
712                    }
713                    parent_s_op->u.io.count += count;
714                    gossip_debug(GOSSIP_IO_DEBUG, "mean=%lf\n", result);
715                    break;
716                case (0x5800000f): /* KMEANS */ {
717                    int index, numObjs, j;
718                    int numClusters=2, numCoords=4;
719                    double delta;
720                   
721#if 0
722                    for(i=0; i<numObjs; i++) {
723                        //result = a[i];
724                        index = find_nearest_cluster(numClusters, numCoords, a[i], clusters);
725                        if(membership[i] != index) delta += 1.0;
726                        membership[i] = index;
727
728                        newClusterSize[index]++;
729                        for(j=0; j<numCoords; j++)
730                            newClusters[index][j] += objects[i][j];
731                    }
732#endif
733                    js_p->error_code = DO_ALLREDUCE;
734                    break;
735                }
736                default:
737                    break;
738                } /* end inner switch */
739                s_op->u.pipeline.buffer = (void *)a;
740                parent_s_op->u.io.tmp_buffer = (void *)tmp;
741            }
742           
743            break;
744        default:
745            break;
746        } /* end switch() */
747    } /* end if() */
748
749    return SM_ACTION_COMPLETE;
750}
751
752static PINT_sm_action setup_allreduce_sm(struct PINT_smcb *smcb, job_status_s *js_p)
753{
754    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
755    struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);
756    int ret, tmp_id;
757    struct server_configuration_s *user_opts = get_server_config_struct();
758    PVFS_handle new_rank_handle;
759    PVFS_BMI_addr_t svr_addr;
760    struct PINT_server_op *allreduce_op;
761    int i;
762
763    allreduce_op = malloc(sizeof(*allreduce_op));
764    memset(allreduce_op, 0, sizeof(*allreduce_op));
765    allreduce_op->u.allreduce.fs_id = s_op->u.pipeline.fs_id;
766    allreduce_op->u.allreduce.myRank = s_op->u.pipeline.dfile_index;
767    allreduce_op->u.allreduce.recv_buf = (void*)malloc(5*sizeof(double));
768    memset(allreduce_op->u.allreduce.recv_buf, 0, 5*sizeof(double));
769    allreduce_op->u.allreduce.tree_depth = log2(s_op->u.pipeline.dfile_count); /* FIXME !!!! */
770    gossip_debug(GOSSIP_IO_DEBUG, "tree_depth=%d\n",
771                 allreduce_op->u.allreduce.tree_depth);
772    allreduce_op->u.allreduce.current_depth = 0;
773    allreduce_op->u.allreduce.mask = 0x1;
774    allreduce_op->u.allreduce.dfile_array = parent_s_op->u.io.dfile_array;
775    allreduce_op->u.allreduce.send_buf = (void*)malloc(5*sizeof(double));
776    memset(allreduce_op->u.allreduce.send_buf, 0, 5*sizeof(double));
777
778    ret = PINT_sm_push_frame(smcb, 0, allreduce_op);
779   
780    js_p->error_code = 0;
781    return SM_ACTION_COMPLETE;
782}
783
784static PINT_sm_action cleanup_allreduce_fn(struct PINT_smcb *smcb, job_status_s *js_p)
785{
786    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
787    int task_id;
788    int remaining;
789    PVFS_error tmp_err;
790    struct PINT_server_op *allreduce_op;
791
792    allreduce_op = PINT_sm_pop_frame(smcb, &task_id, &tmp_err,
793            &remaining);
794    gossip_debug(GOSSIP_SERVER_DEBUG,
795                 "pipeline: nested sm returned error code: %d\n", tmp_err);
796    memcpy(s_op->u.pipeline.buffer, allreduce_op->u.allreduce.send_buf,
797           5*sizeof(double));
798    free(allreduce_op->u.allreduce.send_buf);
799    free(allreduce_op->u.allreduce.recv_buf);
800    free(allreduce_op);
801
802    js_p->error_code = 0;
803    return SM_ACTION_COMPLETE;
804}
805
806static PINT_sm_action check_pipeline_done(struct PINT_smcb *smcb, job_status_s *js_p)
807{
808    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
809    struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);
810    PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
811    js_p->error_code = 0;
812
813    /* FIMXE: do we really need this lock? */
814    gen_mutex_lock(&parent_s_op->u.io.mutex);
815    //parent_s_op->u.io.total_transferred += js_p->actual_size;
816    parent_s_op->u.io.total_transferred += s_op->u.pipeline.buffer_used;
817    gen_mutex_unlock(&parent_s_op->u.io.mutex);
818   
819    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%lld\n", __func__,
820                 lld(parent_s_op->u.io.total_transferred));
821    gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%lld\n", __func__,
822                 lld(js_p->actual_size));
823
824    /* FIXME: */
825    /* unless the second condition is set, the server starts
826       a new read request to one already done, and falls into
827       infinite trove_read->bmi_send->check_done loop */
828    if(!segpool_done(h) && s_op->u.pipeline.segs != 0) {
829        gossip_debug(GOSSIP_IO_DEBUG, "%s: LOOP\n", __func__);
830        js_p->error_code = LOOP;
831    }
832    else {
833        if(parent_s_op->u.io.op == (0x5800000f)) { /* KMEANS */
834            parent_s_op->u.io.tmp_buffer = (void *)s_op->u.pipeline.buffer;
835        }
836        gossip_debug(GOSSIP_IO_DEBUG, "%s: DONE\n", __func__);
837    }
838   
839    return SM_ACTION_COMPLETE;
840}
841
842/*
843 * Local variables:
844 *  mode: c
845 *  c-indent-level: 4
846 *  c-basic-offset: 4
847 * End:
848 *
849 * vim: ft=c ts=8 sts=4 sw=4 expandtab
850 */
Note: See TracBrowser for help on using the browser.