root/branches/as-branch/src/server/pipeline.sm @ 7853

Revision 7853, 26.7 KB (checked in by sson, 4 years ago)

Fixed the incorrect allreduce operation when there is only one server
by skipping entering allreduce.sm.

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/*
8 *  PVFS2 server state machine for driving read/write I/O operations.
9 */
10#define _ISOC99_SOURCE /* log2() */
11
12#include <string.h>
13#include <assert.h>
14#include <stdlib.h>
15#include <math.h> /* log2() */
16
17#include "server-config.h"
18#include "pvfs2-server.h"
19#include "pvfs2-attr.h"
20#include "pvfs2-request.h"
21#include "pint-distribution.h"
22#include "pvfs2-dist-simple-stripe.h"
23#include "pint-request.h"
24#include "pvfs2-internal.h"
25#include "trove.h"
26#include "pint-util.h"
27#include "pint-cached-config.h"
28
29#define LOOP 101
30#define UNALIGNED 102
31#define DO_COMP 103
32#define DO_ALLREDUCE 104
33#define DO_KMEANS 105
34
35static int s2s_comp_fn(
36    void *v_p, struct PVFS_server_resp *resp_p, int index);
37
38%%
39
40nested machine pvfs2_pipeline_sm
41{
42    state fetch
43    {
44        run fetch_data;
45        success => dispatch;
46    }
47
48    state dispatch
49    {
50        run dispatch_data;
51        DO_COMP => check_align;
52        default => check_pipeline;
53    }
54
55    state check_align
56    {
57        run check_align_fn;
58        UNALIGNED => setup_s2s;
59        success => do_comp;
60    }
61
62    state setup_s2s
63    {
64        run setup_s2s_msg;
65        success => s2s_exchange;
66    }
67
68    state s2s_exchange
69    {
70        jump pvfs2_msgpairarray_sm;
71        success => do_comp;
72    }
73
74    state do_comp
75    {
76        run do_comp_fn;
77        DO_KMEANS => setup_kmeans;
78        success => check_pipeline;
79    }
80
81    state setup_kmeans
82    {
83        run setup_kmeans_fn;
84        success => run_kmeans;
85    }
86
87    state run_kmeans
88    {
89        jump pvfs2_kmeans_sm;
90        success => cleanup_kmeans;
91    }
92
93    state cleanup_kmeans
94    {
95        run cleanup_kmeans_fn;
96        success => check_pipeline;
97    }
98
99    state check_pipeline
100    {
101        run check_pipeline_done;
102        LOOP => fetch;
103        success => return;
104    }
105}
106
107%%
108
109/*
110 * fetch data from either TROVE (in case of READ) or BMI (in case of WRITE)
111 *
112 *   PINT_segpool_take_segments()
113 *     => READ: job_trove_bstream_read_list()
114 *     => WRITE: job_bmi_recv()
115 */
116static PINT_sm_action fetch_data(struct PINT_smcb *smcb, job_status_s *js_p)
117{
118    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
119    PINT_segpool_handle_t seg_handle = s_op->u.pipeline.seg_handle;
120    PINT_segpool_unit_id id = s_op->u.pipeline.id;
121    struct server_configuration_s *user_opts = get_server_config_struct();
122    int count, ret, i;
123    PVFS_offset *offsets;
124    PVFS_size *sizes;
125    PVFS_size bytes;
126    job_id_t tmp_id;
127
128    gossip_debug(GOSSIP_IO_DEBUG, "smcb->base_frame=%d, frame_count=%d\n",
129                 smcb->base_frame, smcb->frame_count);
130    s_op->u.pipeline.buffer_used = 0;
131    bytes = s_op->u.pipeline.buffer_size;
132
133    PINT_segpool_take_segments(seg_handle, id, &bytes, &count,
134                               &offsets, &sizes);
135    gossip_debug(GOSSIP_IO_DEBUG, "%s: %s: bytes=%lld, count=%d\n",
136                 __func__,
137                 (s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
138                 lld(bytes), count);
139
140    for(i=0; i<count; i++) {
141        gossip_debug(GOSSIP_IO_DEBUG, "offsets[%d]=%lld, sizes[%d]=%lld\n",
142                     i, lld(offsets[i]), i, lld(sizes[i]));
143    }
144
145    if(count == 0) {
146        js_p->error_code = 0;
147        //gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
148        return SM_ACTION_COMPLETE;
149    }
150
151    s_op->u.pipeline.buffer_used = bytes;
152    s_op->u.pipeline.offsets = offsets;
153    s_op->u.pipeline.sizes = sizes;
154    s_op->u.pipeline.segs = count;
155
156    if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
157       
158        ret = job_trove_bstream_read_list
159            (s_op->u.pipeline.fs_id,
160             s_op->u.pipeline.handle,
161             (char **)&s_op->u.pipeline.buffer,
162             (PVFS_size *)&s_op->u.pipeline.buffer_used,
163             1,
164             offsets,
165             sizes,
166             count,
167             &s_op->u.pipeline.out_size,
168             s_op->u.pipeline.trove_sync_flag,
169             NULL,
170             smcb,
171             0,
172             js_p,
173             &tmp_id,
174             server_job_context,
175             s_op->u.pipeline.hints);
176    }
177    else if (s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
178        ret = job_bmi_recv(s_op->u.pipeline.address,
179                       (void *)s_op->u.pipeline.buffer,
180                       s_op->u.pipeline.buffer_size,
181                       s_op->u.pipeline.tag,
182                       BMI_PRE_ALLOC,
183                       smcb,
184                       0, /* unsigned long status_user_tag = 0 */
185                       js_p,
186                       &tmp_id,
187                       server_job_context,
188                       user_opts->server_job_flow_timeout,
189                       (bmi_hint)s_op->u.pipeline.hints);
190    }
191
192    if(ret < 0) {
193        gossip_err("%s: I/O error occurred\n", __func__);
194        /* FIXME */
195        //handle_io_error(ret, q_item, flow_data);
196        js_p->error_code = -PVFS_EIO;
197        return SM_ACTION_COMPLETE;
198    }
199
200    /* immediate return */
201    if(ret == 1) {
202        js_p->error_code = 1;
203        return SM_ACTION_COMPLETE;
204    }
205
206    if(ret == 0) {
207        js_p->error_code = 0;
208        return SM_ACTION_DEFERRED;
209    }
210
211    return SM_ACTION_COMPLETE;
212}
213
214/*
215 * Dispatch data to either BMI (in case of READ) or TROVE (in case of WRITE)
216 *
217 *   => READ: job_bmi_send()
218 *   => WRITE: job_trove_bstream_write_list()
219 */
220static PINT_sm_action dispatch_data(struct PINT_smcb *smcb, job_status_s *js_p)
221{
222    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
223    int ret;
224    job_id_t tmp_id;
225    struct server_configuration_s *user_opts = get_server_config_struct();
226
227    if(s_op->u.pipeline.segs == 0) {
228        js_p->error_code = 0;
229        gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
230        return SM_ACTION_COMPLETE;
231    }
232
233    gossip_debug(GOSSIP_IO_DEBUG, "%s: %s: buffer_used=%lld\n", __func__,
234                 (s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
235                 lld(s_op->u.pipeline.buffer_used));
236#if 0
237    gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
238                 (char *)s_op->u.pipeline.buffer);
239#endif
240
241   
242    if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
243        assert(s_op->u.pipeline.buffer_used);
244
245        if(s_op->u.pipeline.op != 0) { /* AS: when op is specified */
246            ret = DO_COMP; /* AS: skip sending if op is specified */
247            gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
248            js_p->error_code = ret;
249            return SM_ACTION_COMPLETE;
250        }
251        else
252            ret = job_bmi_send(s_op->u.pipeline.address,
253                               s_op->u.pipeline.buffer,
254                               js_p->actual_size,
255                               s_op->u.pipeline.tag,
256                               BMI_PRE_ALLOC,
257                               0, /* send_unexpected */
258                               smcb, /* user_ptr */
259                               0, /* status_user_tag */
260                               js_p,
261                               &tmp_id,
262                               server_job_context,
263                               user_opts->server_job_bmi_timeout,
264                               (bmi_hint)s_op->u.pipeline.hints);
265       
266    }
267    else if(s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
268        ret = job_trove_bstream_write_list
269            (s_op->u.pipeline.fs_id,
270             s_op->u.pipeline.handle,
271             (char **)&s_op->u.pipeline.buffer,
272             (TROVE_size *)&js_p->actual_size,
273             1,
274             s_op->u.pipeline.offsets,
275             s_op->u.pipeline.sizes,
276             s_op->u.pipeline.segs,
277             &s_op->u.pipeline.out_size,
278             s_op->u.pipeline.trove_sync_flag,
279             NULL,
280             smcb,
281             0,
282             js_p,
283             &tmp_id,
284             server_job_context,
285             s_op->u.pipeline.hints);
286    }
287
288    if(ret < 0) {
289        gossip_err("%s: I/O error occurred\n", __func__);
290        /* FIXME !!!!!!! */
291        /* handle_io_error(ret, q_item, flow_data); */
292        js_p->error_code = ret;
293        return SM_ACTION_COMPLETE;
294    }
295   
296    /* immediate return */
297    if(ret == 1) {
298        js_p->error_code = ret;
299        return SM_ACTION_COMPLETE;
300    }
301       
302    if(ret == 0) {
303        js_p->error_code = ret;
304        return SM_ACTION_DEFERRED;
305    }
306
307    return SM_ACTION_COMPLETE;
308}
309
310
311static PINT_sm_action check_align_fn(struct PINT_smcb *smcb, job_status_s *js_p)
312{
313    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
314    js_p->error_code = 0;
315    PVFS_size file_req_offset = s_op->u.pipeline.file_req_offset;
316    PINT_request_file_data fdata = s_op->u.pipeline.file_data;
317    PVFS_simple_stripe_params *dparam =
318        (PVFS_simple_stripe_params*)fdata.dist->params;
319    PVFS_size count;
320    PVFS_offset strip_boundary;
321
322    PVFS_offset loff = fdata.dist->methods->physical_to_logical_offset(fdata.dist->params, &fdata, s_op->u.pipeline.offsets[0]);
323   
324    s_op->u.pipeline.loff = loff;
325    gossip_debug(GOSSIP_IO_DEBUG, "loff=%lld, file_req_offset=%lld\n", lld(loff), lld(file_req_offset));
326
327    switch(s_op->u.pipeline.datatype) {
328    case ((int)0x4c000405): /* MPI_INT */
329        count = (PVFS_size)(s_op->u.pipeline.buffer_used -
330                            file_req_offset)/((*PVFS_INT).ub);
331        gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(count));
332        strip_boundary = ((int)(loff/(dparam->strip_size)))*(dparam->strip_size);
333        s_op->u.pipeline.unaligned_size = loff-strip_boundary;
334       
335        if (loff == strip_boundary && file_req_offset != 0) {
336            s_op->u.pipeline.unaligned_size = file_req_offset;
337        }
338
339        if (s_op->u.pipeline.unaligned_size != 0 && count != 0) {
340            js_p->error_code = UNALIGNED;
341            gossip_debug(GOSSIP_IO_DEBUG, "unaligned_size=%lld\n",
342                         lld(s_op->u.pipeline.unaligned_size));
343        }
344        break;
345    case ((int)0x4c00040a): /* MPI_FLOAT */
346        count = (PVFS_size)(s_op->u.pipeline.buffer_used)/((*PVFS_FLOAT).ub);
347        if(s_op->u.pipeline.op == 0x5800000f) { /* KMEANS */
348            count = (PVFS_size)(s_op->u.pipeline.buffer_used)/((*PVFS_FLOAT).ub)/18; /* FIXME: 18 is numCoords */
349            s_op->u.pipeline.unaligned_size = 0; /* FIXME */
350            gossip_debug(GOSSIP_IO_DEBUG, "FLOAT: count=%lld\n", lld(count));
351        }
352        gossip_debug(GOSSIP_IO_DEBUG, "strip_size=%ld, count=%lld\n",
353                     dparam->strip_size, lld(count));
354        //strip_boundary = ((int)(loff/(dparam->strip_size)))*(dparam->strip_size);
355        //s_op->u.pipeline.unaligned_size = loff-strip_boundary;
356        if(dparam->strip_size > (count*(*PVFS_FLOAT).ub)) {
357            s_op->u.pipeline.unaligned_size = dparam->strip_size -
358                (count*((*PVFS_FLOAT).ub));
359            if(s_op->u.pipeline.op == 0x5800000f)
360                s_op->u.pipeline.unaligned_size = dparam->strip_size
361                    - (count*((*PVFS_FLOAT).ub)*18);
362        }
363#if 0   
364        if (loff == strip_boundary && file_req_offset != 0) {
365            s_op->u.pipeline.unaligned_size = file_req_offset;
366        }
367#endif
368        if (s_op->u.pipeline.unaligned_size != 0 && count != 0) {
369            js_p->error_code = UNALIGNED;
370            gossip_debug(GOSSIP_IO_DEBUG, "unaligned_size=%lld\n",
371                         lld(s_op->u.pipeline.unaligned_size));
372        }
373        break;
374    case ((int)0x4c00080b): /* MPI_DOUBLE */
375        count = (PVFS_size)(s_op->u.pipeline.buffer_used -
376                            file_req_offset)/((*PVFS_DOUBLE).ub);
377        gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(count));
378        strip_boundary = ((int)(loff/(dparam->strip_size)))*(dparam->strip_size); /* FIXME */
379        s_op->u.pipeline.unaligned_size = loff-strip_boundary;
380
381        if (loff == strip_boundary && file_req_offset != 0) {
382            s_op->u.pipeline.unaligned_size = file_req_offset;
383        }
384       
385        if (s_op->u.pipeline.unaligned_size != 0 && count != 0) {
386            js_p->error_code = UNALIGNED;
387            gossip_debug(GOSSIP_IO_DEBUG, "unaligned_size=%lld\n",
388                         lld(s_op->u.pipeline.unaligned_size));
389        }
390        break;
391    }
392
393    return SM_ACTION_COMPLETE;
394}
395
396
397static PINT_sm_action setup_s2s_msg(struct PINT_smcb *smcb, job_status_s *js_p)
398{
399    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
400    PINT_sm_msgpair_state *msg_p = NULL;
401    struct server_configuration_s *user_opts = get_server_config_struct();
402    int regions;
403    int ret;
404    PVFS_credentials creds;
405    int next_server_index;
406    PVFS_handle next_server_handle;
407
408    /* init msgpair */
409    PINT_msgpair_init(&s_op->msgarray_op);
410    msg_p = &s_op->msgarray_op.msgpair;
411
412    s_op->msgarray_op.params.job_timeout = user_opts->client_job_bmi_timeout;
413    s_op->msgarray_op.params.retry_delay = user_opts->client_retry_delay_ms;
414    s_op->msgarray_op.params.retry_limit = user_opts->client_retry_limit;
415    s_op->msgarray_op.params.quiet_flag = 1;
416
417    PINT_util_gen_credentials(&creds);
418    gossip_debug(GOSSIP_IO_DEBUG, "loff=%lld, buffer_used=%lld\n", lld(s_op->u.pipeline.loff), lld(s_op->u.pipeline.buffer_used));
419
420    /* determine which server we need to talk to */
421    next_server_index = (s_op->u.pipeline.dfile_index + 1)%(s_op->u.pipeline.dfile_count);
422    next_server_handle = s_op->u.pipeline.dfile_array[next_server_index];
423
424    /* build a request */
425    ret = PVFS_Request_contiguous(s_op->u.pipeline.unaligned_size,
426                                  PVFS_BYTE, &s_op->u.pipeline.file_req);
427   
428    s_op->u.pipeline.file_req_offset = (((int)(s_op->u.pipeline.loff/262144))+1)*262144; /* FIXME */
429   
430    regions = 1;
431    gossip_debug(GOSSIP_IO_DEBUG, "s_op->u.pipeline.file_req_offset=%lld\n", lld(s_op->u.pipeline.file_req_offset));
432
433    PINT_SERVREQ_SMALL_IO_FILL(msg_p->req,
434                               creds,
435                               s_op->u.pipeline.fs_id,
436                               next_server_handle,
437                               s_op->u.pipeline.io_type,
438                               next_server_index,
439                               s_op->u.pipeline.dfile_count,
440                               s_op->u.pipeline.dist,
441                               s_op->u.pipeline.file_req,
442                               s_op->u.pipeline.file_req_offset,
443                               regions,
444                               s_op->u.pipeline.unaligned_size,
445                               NULL /* s_op->hints */);
446
447    msg_p->fs_id = s_op->u.pipeline.fs_id;
448    msg_p->handle = next_server_handle;
449    msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
450    msg_p->comp_fn = s2s_comp_fn;
451
452    ret = PINT_cached_config_map_to_server(&msg_p->svr_addr,
453                                           next_server_handle, //s_op->u.pipeline.handle,
454                                           s_op->u.pipeline.fs_id);
455    gossip_debug(GOSSIP_IO_DEBUG, "%s: msg_p->svr_addr=%llu\n", __func__,
456                 llu(msg_p->svr_addr));
457    if(ret < 0) {
458        gossip_err("Failed to map meta server address\n");
459        js_p->error_code = ret;
460        return SM_ACTION_COMPLETE;
461    }
462
463    js_p->error_code = 0;
464
465    PINT_sm_push_frame(smcb, 0, &s_op->msgarray_op);
466    return SM_ACTION_COMPLETE;
467}
468
469static int s2s_comp_fn(void *v_p, struct PVFS_server_resp *resp_p,
470                        int index)
471{
472    PINT_smcb *smcb = v_p;
473    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
474
475    gossip_debug(GOSSIP_IO_DEBUG, "%s called\n", __func__);
476    gossip_debug(GOSSIP_IO_DEBUG, "resp_p->status=%d\n", resp_p->status);
477    gossip_debug(GOSSIP_IO_DEBUG, "small_io: result_size=%lld\n",
478                     lld(resp_p->u.small_io.result_size));
479
480    assert(resp_p->op == PVFS_SERV_SMALL_IO);
481
482    if (resp_p->status != 0) {
483        return resp_p->status;
484    }
485
486    if(resp_p->u.small_io.result_size != 0) {
487        memcpy(s_op->u.pipeline.tmp_buf, resp_p->u.small_io.buffer,
488               resp_p->u.small_io.result_size);
489    }
490
491    return 0;
492}
493
494static PINT_sm_action do_comp_fn(struct PINT_smcb *smcb, job_status_s *js_p)
495{
496    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
497    struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);
498    js_p->error_code = 0;
499
500    if(s_op->u.pipeline.buffer) {
501        PVFS_size i;
502        gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
503                     "%s: buffer_used=%lld, op=0x%x, datatype=0x%x, actual_size=%lld\n",
504                     __func__, lld(s_op->u.pipeline.buffer_used),
505                     s_op->u.pipeline.op,
506                     s_op->u.pipeline.datatype,
507                     lld(js_p->actual_size)); /* AS */
508
509        switch(s_op->u.pipeline.datatype) {
510        case ((int)0x4c000405): /* MPI_INT */
511            {
512                int *a = (int*)s_op->u.pipeline.buffer;
513                int result;
514                PVFS_size count = (s_op->u.pipeline.buffer_used-s_op->u.pipeline.unaligned_size)/((*PVFS_INT).ub);
515                int *tmp;
516
517                if(count == 0)
518                    return SM_ACTION_COMPLETE;
519                /* data is not aligned perfectly, so adjust it */
520                if(s_op->u.pipeline.unaligned_size != 0) {
521                    memcpy(((char*)&a[count])+(((*PVFS_INT).ub)-s_op->u.pipeline.unaligned_size), s_op->u.pipeline.tmp_buf, s_op->u.pipeline.unaligned_size);
522                    count++;
523                }
524               
525                if (parent_s_op->u.io.total_transferred == 0) {
526                    if (parent_s_op->u.io.tmp_buffer == NULL)
527                        parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(int));
528                    memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(int));
529                    parent_s_op->u.io.count = 0;
530                }
531                tmp = parent_s_op->u.io.tmp_buffer;
532                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%lld\n", lld(parent_s_op->u.io.total_transferred));
533                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%lld, tmp=%d\n", lld(count), *tmp);
534
535                switch(s_op->u.pipeline.op) {
536                case 0x58000001: /* MAX */
537                    result = *a;
538                    for (i=1; i<count; i++ ) {
539                        if (a[i] > result) {
540                            result = a[i];
541                        }
542                    }
543                    a[0] = result;
544                    if (parent_s_op->u.io.total_transferred == 0 ||
545                        result > *tmp)
546                        *tmp = result;
547                    break;
548                case 0x58000002: /* MIN */
549                    result = *a;
550                    for (i=1; i<count; i++ ) {
551                        if (a[i] < result) {
552                            result = a[i];
553                        }
554                    }
555                    a[0] = result;
556                    if (parent_s_op->u.io.total_transferred == 0 ||
557                        result < *tmp)
558                        *tmp = result;
559                    break;
560                case 0x58000003: /* SUM */
561                    result = 0;
562                    for (i=0; i<count; i++ ) {
563                        if (i<10)
564                            gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%lld]=%d\n",
565                                         lld(i), a[i]);
566                        result += a[i];
567                    }
568                    a[0] = result;
569                    *tmp += result;
570                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%d\n",
571                                 *tmp);
572                    break;
573                case (0x5800000e): /* MEAN */
574                    result = 0;
575                    for (i=0; i<count; i++) {
576                        result += a[i];
577                    }
578                    result = result/count;
579                    a[0] = result;
580                   
581                    if (parent_s_op->u.io.count == 0)
582                        *tmp = result;
583                    else {
584                        int tmp_sum = (*tmp)*(parent_s_op->u.io.count);
585                        tmp_sum = tmp_sum + result*count;
586                        *tmp = (tmp_sum)/(parent_s_op->u.io.count+count);
587                    }
588                    parent_s_op->u.io.count += count;
589                    gossip_debug(GOSSIP_IO_DEBUG, "mean=%d\n", *tmp);
590                    break;
591                default:
592                    break;
593                }
594                s_op->u.pipeline.buffer = (void *)a;
595                parent_s_op->u.io.tmp_buffer = (void *)tmp;
596            }
597            break;
598
599        case ((int)0x4c00040a): /* MPI_FLOAT */
600            {
601                float *a = (float*)s_op->u.pipeline.buffer;
602                float result;
603                PVFS_size count = (s_op->u.pipeline.buffer_used -
604                                   s_op->u.pipeline.unaligned_size)/((*PVFS_FLOAT).ub);
605                switch(s_op->u.pipeline.op) {
606                case 0x58000001: /* MAX */
607                    break;
608                case 0x58000002: /* MIN */
609                    break;
610                case 0x58000003: /* SUM */
611                    break;
612                case (0x5800000e): /* MEAN */
613                    break;
614                case (0x5800000f): /* KMEANS */
615                    js_p->error_code = DO_KMEANS;
616                    break;
617                default:
618                    break;
619                }
620            }
621            break;
622
623        case ((int)0x4c00080b): /* MPI_DOUBLE */
624            {
625                double *a = (double*)s_op->u.pipeline.buffer;
626                double result;
627                PVFS_size count = (s_op->u.pipeline.buffer_used-s_op->u.pipeline.unaligned_size)/((*PVFS_DOUBLE).ub);
628                double *tmp;
629                PVFS_offset strip_boundary = (int)(s_op->u.pipeline.loff/262144)*262144; /* FIXME */
630
631                if (s_op->u.pipeline.buffer_used < 262144 &&
632                    s_op->u.pipeline.loff != strip_boundary) { /* FIXME */
633                    count = (s_op->u.pipeline.buffer_used)/((*PVFS_DOUBLE).ub);
634                }
635                gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(count));
636                if(count < 1)
637                    return SM_ACTION_COMPLETE;
638
639                /* data is not aligned perfectly, so adjust it within the memory*/
640                if(s_op->u.pipeline.unaligned_size != 0) {
641                    PVFS_size adj_sz = s_op->u.pipeline.unaligned_size;
642                    PVFS_size tmp_sz = ((*PVFS_DOUBLE).ub) - adj_sz;
643                   
644                    if(s_op->u.pipeline.loff == strip_boundary) {
645                        memcpy(a, (char*)&a[0]+adj_sz,
646                               s_op->u.pipeline.buffer_used-adj_sz);
647                    }
648
649                    memcpy(((char*)&a[count])+tmp_sz, s_op->u.pipeline.tmp_buf,
650                           s_op->u.pipeline.unaligned_size);
651
652
653                    count++;
654                   
655                    if(s_op->u.pipeline.buffer_used < (262144-((*PVFS_DOUBLE).ub)))
656                        count--;
657                }
658
659                if (parent_s_op->u.io.total_transferred == 0) {
660                    if (parent_s_op->u.io.tmp_buffer == NULL)
661                        parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(double));
662                    memset( parent_s_op->u.io.tmp_buffer, 0, sizeof(double));
663                    parent_s_op->u.io.count = 0;
664                }
665                tmp = parent_s_op->u.io.tmp_buffer;
666                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
667                             "total_transferred=%lld\n",
668                             lld(parent_s_op->u.io.total_transferred));
669                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%lld, tmp=%lf\n",
670                             lld(count), *tmp);
671                switch(s_op->u.pipeline.op) {
672                case 0x58000001: /* MAX */
673                    result = *a;
674                    for (i=1; i<count; i++ ) {
675                        if (a[i] > result) {
676                            result = a[i];
677                        }
678                    }
679                    a[0] = result;
680                    if (parent_s_op->u.io.total_transferred == 0 ||
681                        result > *tmp)
682                        *tmp = result;
683                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "max=%lf\n", *tmp);
684                    break;
685                case 0x58000002: /* MIN */
686                    result = *a;
687                    for (i=1; i<count; i++ ) {
688                        if (a[i] < result) {
689                            result = a[i];
690                        }
691                    }
692
693                    a[0] = result;
694                    if (parent_s_op->u.io.total_transferred == 0 ||
695                        result < *tmp)
696                        *tmp = result;
697                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "min=%lf\n",
698                                 *tmp);
699                    break;
700                case 0x58000003: /* SUM */
701                    result = 0;
702                    for (i=0; i<count; i++ ) {
703                        if (i<10 || i== (count-1) || i == count) gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%lld]=%lf\n", lld(i), a[i]);
704                        result += a[i];
705                    }
706
707                    a[0] = result;
708                    *tmp += result;
709                    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%lf\n",
710                                 *tmp);
711                    break;   
712                case (0x5800000e): /* MEAN */
713                    result = 0;
714                    for (i=0; i<count; i++) {
715                        result += a[i];
716                    }
717                    result = result/count;
718                    a[0] = result;
719
720                    gossip_debug(GOSSIP_IO_DEBUG, "count=%lld\n", lld(parent_s_op->u.io.count));
721
722                    if (parent_s_op->u.io.count == 0)
723                        *tmp = result;
724                    else {
725                        double tmp_sum = (*tmp)*(parent_s_op->u.io.count);
726                        tmp_sum = tmp_sum + result*count;
727                        *tmp = (tmp_sum)/(parent_s_op->u.io.count+count);
728                    }
729                    parent_s_op->u.io.count += count;
730                    gossip_debug(GOSSIP_IO_DEBUG, "mean=%lf\n", result);
731                    break;
732                case (0x5800000f): /* KMEANS */ {
733#if 0
734                    int index, numObjs, j;
735                    int numClusters=2, numCoords=4;
736                    double delta;
737                   
738                    for(i=0; i<numObjs; i++) {
739                        //result = a[i];
740                        index = find_nearest_cluster(numClusters, numCoords, a[i], clusters);
741                        if(membership[i] != index) delta += 1.0;
742                        membership[i] = index;
743
744                        newClusterSize[index]++;
745                        for(j=0; j<numCoords; j++)
746                            newClusters[index][j] += objects[i][j];
747                    }
748#endif
749                    js_p->error_code = DO_ALLREDUCE;
750                    break;
751                }
752                default:
753                    break;
754                } /* end inner switch */
755                s_op->u.pipeline.buffer = (void *)a;
756                parent_s_op->u.io.tmp_buffer = (void *)tmp;
757            }
758           
759            break;
760        default:
761            break;
762        } /* end switch() */
763    } /* end if() */
764
765    return SM_ACTION_COMPLETE;
766}
767
768static PINT_sm_action setup_kmeans_fn(struct PINT_smcb *smcb, job_status_s *js_p)
769{
770    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
771    int ret, i, j;
772    struct PINT_server_op *kmeans_op;
773    int numClusters = 20; /* FIXME: should come from client? */
774    int numCoords = 18; /* FIXME: should come from client? */
775
776    kmeans_op = malloc(sizeof(*kmeans_op));
777    memset(kmeans_op, 0, sizeof(*kmeans_op));
778
779    kmeans_op->u.kmeans.myRank = s_op->u.pipeline.dfile_index;
780    kmeans_op->u.kmeans.dfile_array = s_op->u.pipeline.dfile_array;
781    kmeans_op->u.kmeans.dfile_count = s_op->u.pipeline.dfile_count;
782    kmeans_op->u.kmeans.fs_id = s_op->u.pipeline.fs_id;
783    kmeans_op->u.kmeans.allreduce_step = 0;
784
785    kmeans_op->u.kmeans.numClusters = numClusters;
786    kmeans_op->u.kmeans.numCoords = numCoords;
787
788    kmeans_op->u.kmeans.threshold = 0.001; /* FIXME */
789    kmeans_op->u.kmeans.totalNumObjs = 17695; /* FIXME: should be obtained from allreduce(numObjs, SUM) */
790    kmeans_op->u.kmeans.numObjs = s_op->u.pipeline.buffer_used/sizeof(float)/numCoords;
791    gossip_debug(GOSSIP_IO_DEBUG, "numObjs=%d\n", kmeans_op->u.kmeans.numObjs);
792
793    /* allocate space for data points */
794    kmeans_op->u.kmeans.objects = (float**)malloc((kmeans_op->u.kmeans.numObjs) * sizeof(float*));
795    assert(kmeans_op->u.kmeans.objects != NULL);
796    kmeans_op->u.kmeans.objects[0] = (float*) malloc((kmeans_op->u.kmeans.numObjs)*(numCoords) * sizeof(float));
797    assert(kmeans_op->u.kmeans.objects[0] != NULL);
798    for (i=1; i<kmeans_op->u.kmeans.numObjs; i++)
799        kmeans_op->u.kmeans.objects[i] = kmeans_op->u.kmeans.objects[i-1] + (numCoords);
800
801    /* allocate a 2D space for clusters[] (coordinates of cluster centers)
802       this array should be the same across all processes */
803    kmeans_op->u.kmeans.clusters = (float **)malloc(numClusters*sizeof(float*));
804    assert(kmeans_op->u.kmeans.clusters != NULL);
805    kmeans_op->u.kmeans.clusters[0] = (float *)malloc(numClusters*numCoords*sizeof(float));
806    assert(kmeans_op->u.kmeans.clusters[0] != NULL);
807
808    for(i=1; i<numClusters; i++)
809        kmeans_op->u.kmeans.clusters[i] = kmeans_op->u.kmeans.clusters[i-1] + numCoords;
810
811    /* FIXME: better way to copy the read data to objects[][]?? */
812    for(i=0; i<kmeans_op->u.kmeans.numObjs; i++)
813        memcpy(kmeans_op->u.kmeans.objects[i],
814               s_op->u.pipeline.buffer+(i*kmeans_op->u.kmeans.numCoords*sizeof(float)),
815               kmeans_op->u.kmeans.numCoords*sizeof(float));
816
817    /* pick first numClusters elements in feature[] as initial cluster centers*/
818    if(s_op->u.pipeline.dfile_index == 0) {
819        for (i=0; i<numClusters; i++)
820            for (j=0; j<numCoords; j++) {
821                kmeans_op->u.kmeans.clusters[i][j] = kmeans_op->u.kmeans.objects[i][j];
822                gossip_debug(GOSSIP_IO_DEBUG, "clusters[%d][%d]=%lf\n", i, j, kmeans_op->u.kmeans.clusters[i][j]);
823            }
824    }
825   
826    kmeans_op->u.kmeans.allreduce_step = 0;
827
828    ret = PINT_sm_push_frame(smcb, 0, kmeans_op);
829
830    js_p->error_code = 0;
831    return SM_ACTION_COMPLETE;
832}
833
834static PINT_sm_action cleanup_kmeans_fn(struct PINT_smcb *smcb, job_status_s *js_p)
835{
836    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
837    int task_id;
838    int remaining;
839    PVFS_error tmp_err;
840    struct PINT_server_op *kmeans_op;
841
842    kmeans_op = PINT_sm_pop_frame(smcb, &task_id, &tmp_err,
843            &remaining);
844    gossip_debug(GOSSIP_SERVER_DEBUG,
845                 "pipeline: nested sm (kmeans) returned error code: %d\n", tmp_err);
846    //memcpy(s_op->u.pipeline.buffer, kmeans_op->u.kmeans.send_buf, FIXME);
847    //free(kmeans_op->u.kmeans.clusters[0]);
848    free(kmeans_op->u.kmeans.clusters);
849    free(kmeans_op);
850
851    js_p->error_code = 0;
852    return SM_ACTION_COMPLETE;
853}
854
855static PINT_sm_action check_pipeline_done(struct PINT_smcb *smcb, job_status_s *js_p)
856{
857    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
858    struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);
859    PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
860    js_p->error_code = 0;
861
862    /* FIMXE: do we really need this lock? */
863    gen_mutex_lock(&parent_s_op->u.io.mutex);
864    //parent_s_op->u.io.total_transferred += js_p->actual_size;
865    parent_s_op->u.io.total_transferred += s_op->u.pipeline.buffer_used;
866    gen_mutex_unlock(&parent_s_op->u.io.mutex);
867   
868    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%lld\n", __func__,
869                 lld(parent_s_op->u.io.total_transferred));
870    gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%lld\n", __func__,
871                 lld(js_p->actual_size));
872
873    /* FIXME: */
874    /* unless the second condition is set, the server starts
875       a new read request that is already done, and falls into
876       infinite trove_read->bmi_send->check_done loop */
877    if(!segpool_done(h) && s_op->u.pipeline.segs != 0) {
878        gossip_debug(GOSSIP_IO_DEBUG, "%s: LOOP\n", __func__);
879        js_p->error_code = LOOP;
880    }
881    else {
882#if 0
883        if(s_op->u.pipeline.op == (0x5800000f)) { /* KMEANS */
884            memcpy(parent_s_op->u.io.tmp_buffer,
885                   s_op->u.pipeline.buffer, sizeof(double));
886        }
887#endif
888        gossip_debug(GOSSIP_IO_DEBUG, "%s: DONE\n", __func__);
889    }
890   
891    return SM_ACTION_COMPLETE;
892}
893
894/*
895 * Local variables:
896 *  mode: c
897 *  c-indent-level: 4
898 *  c-basic-offset: 4
899 * End:
900 *
901 * vim: ft=c ts=8 sts=4 sw=4 expandtab
902 */
Note: See TracBrowser for help on using the browser.