root/branches/orange-next/src/server/create-immutable-copies.sm @ 8950

Revision 8950, 79.1 KB (checked in by bligon, 23 months ago)

Corrected compiler errors. Files affected:

src/server/create-immutable-copies.sm
src/server/get-attr.sm

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/* adding a comment */
8
9#include <string.h>
10#include <assert.h>
11
12#include "server-config.h"
13#include "pvfs2-server.h"
14#include "pvfs2-attr.h"
15#include "pvfs2-internal.h"
16#include "pvfs2-util.h"
17#include "pint-util.h"
18#include "pint-eattr.h"
19#include "pint-cached-config.h"
20#include "pvfs2-dist-basic.h"
21#include "pvfs2-mirror.h"
22
23/*Global Variables*/
24
25static char PVFS_handle_string[PVFS_HANDLE_STRING_LEN]={0};
26
27/*attribute keys used for the mirroring process*/
28static char handle_key[]     = USER_PVFS2_MIRROR_HANDLES;
29static char copy_count_key[] = USER_PVFS2_MIRROR_COPIES;
30static char status_key[]     = USER_PVFS2_MIRROR_STATUS;
31static char mode_key[]       = USER_PVFS2_MIRROR_MODE;
32
33enum {
34   LOCAL_HANDLES = 100,
35   REMOTE_HANDLES,
36   REPLACE_DONE,
37   LOCAL_SRC,
38   REMOTE_SRC,
39   RETRY,
40   NOTHING_TO_DO
41};
42
43
44#define SERVER_NAME_MAX   1024
45#define WRITE_RETRY_LIMIT 2
46#define DEFAULT_COPIES    1
47
48
49/*helper macros*/
50
51/*Sets up a two dimensional array from a one dimensional array*/
52#define ONE_DIM_TO_TWO_DIMS(in,out,rows,cols,type) \
53  do {                                             \
54     int i;                                        \
55     type *p;                                      \
56     for (i=0,p=in; i<rows; i++,p+=cols)           \
57         out[i] = p;                               \
58  } while (0)
59
60
61/*prototypes*/
62static int mirror_comp_fn(
63           void *v_p
64          ,struct PVFS_server_resp *resp_p
65          ,int i);
66
67static PVFS_handle *reorganize_copy_handles(
68           struct PINT_server_create_copies_op *imm_p);
69
70static int get_server_names(PINT_server_create_copies_op *imm_p);
71
72
73/*start of state machine*/
74%%
75
76nested machine pvfs2_create_immutable_copies_sm
77{
78   state initialize_structures
79    {
80        run initialize_structures;
81        success => obtain_source_info;
82        default => cleanup;
83    }
84
85   state obtain_source_info
86    {
87        run obtain_source_info;
88        success => inspect_source_info;
89        default => cleanup;
90    }
91
92   state inspect_source_info
93    {
94        run inspect_source_info;
95        success => create_local_datahandles;
96        default => cleanup;
97    }
98
99   state create_local_datahandles
100    {
101        run create_local_datahandles;
102        success => obtain_local_handle_sizes;
103        default => cleanup;
104    }
105
106   state obtain_local_handle_sizes
107    {
108        run obtain_local_handle_sizes;
109        success => inspect_local_handle_sizes;
110        default => cleanup;
111    }
112
113   state inspect_local_handle_sizes
114    {
115        run inspect_local_handle_sizes;
116        success => create_remote_datahandles;
117        default => cleanup;     
118    }
119
120   state create_remote_datahandles
121    {
122        run create_remote_datahandles;
123        success => setup_datahandle_copies;
124        default => remove_local_datahandle_objects;
125    }
126
127   state setup_datahandle_copies
128    {
129        run setup_datahandle_copies;
130        success => copy_data;
131        default => remove_local_datahandle_objects;
132    }
133
134   state copy_data
135    {
136        pjmp copy_data
137        {
138           LOCAL_SRC  => pvfs2_pjmp_mirror_work_sm;
139           REMOTE_SRC => pvfs2_pjmp_call_msgpairarray_sm;
140        }
141        success => check_copy_results;
142        default => cleanup;
143    }
144
145   state check_copy_results
146    {
147        run check_copy_results;
148        success => store_mirror_info;
149        default => check_for_retries;
150    }
151
152   state check_for_retries
153    {
154        run check_for_retries;
155        RETRY   => copy_data;
156        default => store_mirror_info;
157    }
158
159   state store_mirror_info
160    {
161        run store_mirror_info;
162        /*default => replace_remote_datahandle_objects;*/
163        /*If the write of the datahandle information fails, even though the  */
164        /*the copies actually exist, the metadata for the logical file will  */
165        /*NOT have knowledge of it.                                          */
166        default => check_store_job;
167    }
168
169   state check_store_job
170    {
171        run check_store_job;
172        default => cleanup;
173    }
174
175   state replace_remote_datahandle_objects
176    {
177        run replace_remote_datahandle_objects;
178        REPLACE_DONE => remove_local_datahandle_objects;
179        default      => replace_remote_datahandle_objects;
180    }
181
182   state remove_local_datahandle_objects
183    {
184        run remove_local_datahandle_objects;
185        default => cleanup;
186    }
187
188   state cleanup
189    {
190        run cleanup;
191        default => return;
192    }
193
194} /*end nested state machine pvfs2_create_immutable_copies_sm*/
195%%
196
197
198/************************************************************************/
199/*Actions for pvfs2_create_immutable_copies_sm                          */
200/************************************************************************/
201static PINT_sm_action initialize_structures (struct PINT_smcb *smcb
202                                            ,job_status_s *js_p)
203{
204   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing initialize_structures....\n");
205   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tframe count is %d.\n",smcb->frame_count);
206   gossip_debug(GOSSIP_MIRROR_DEBUG,"\t base frame is %d.\n",smcb->base_frame);
207
208   struct PINT_server_op *s_op = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
209   PINT_server_create_copies_op *imm_p = &(s_op->u.create_copies);
210
211   int ret;
212
213   js_p->error_code = 0;
214
215   /* These values are generated by default when the prelude executes.        */
216   /* When seteattr executes, the prelude retrieves the common metadata info. */
217   imm_p->dfile_count = s_op->target_object_attr->u.meta.dfile_count;
218   PVFS_handle_copy(imm_p->metadata_handle, s_op->target_handle);
219   imm_p->fs_id = s_op->target_fs_id;
220
221   /* Get the number of IO servers currently running for the given filesystem */
222   ret = PINT_cached_config_get_num_io(imm_p->fs_id,&(imm_p->io_servers_count));
223   if (ret)
224   {
225      js_p->error_code = ret;
226      return SM_ACTION_COMPLETE;
227   }
228
229   PVFS_handle_unparse(imm_p->metadata_handle,PVFS_handle_string);
230   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tdfile_count: %d\tmetadata_handle: %s"
231                                    "\tfs_id: %u"
232                                    "\tio_servers_count: %d\n"
233                                   ,imm_p->dfile_count
234                                   ,PVFS_handle_string
235                                   ,imm_p->fs_id
236                                   ,imm_p->io_servers_count );
237   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tds_attr.b_size:%d\n"
238                                   ,(int)s_op->ds_attr.u.datafile.b_size);
239
240
241   return SM_ACTION_COMPLETE;
242} /*end action initialize_structures*/
243
244
245
246static PINT_sm_action obtain_source_info (struct PINT_smcb *smcb
247                                         ,job_status_s *js_p)
248{
249 /*In this state, we are retrieving the data handles, the number of      */
250 /*desired copies, and the mirroring mode for the given meta data handle.*/
251 /*If the mirroring mode == NO_MIRRORING, then we will not perform the   */
252 /*mirror operation.                                                     */   
253   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing obtain_source_info....\n");
254
255   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
256   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
257   int keyval_count = 3;
258   job_id_t job_id;
259   int ret = 0,i;
260
261   js_p->error_code = 0;
262
263   /*allocate space to retrieve attributes from trove.*/
264   sm_p->keyval_count = keyval_count;
265
266   sm_p->key_a   = malloc(sizeof(*sm_p->key_a)   * sm_p->keyval_count);
267   sm_p->val_a   = malloc(sizeof(*sm_p->val_a)   * sm_p->keyval_count);
268   sm_p->error_a = malloc(sizeof(*sm_p->error_a) * sm_p->keyval_count);
269   if (!sm_p->key_a || !sm_p->val_a || !sm_p->error_a)
270      goto error_exit;
271 
272   memset(sm_p->key_a  , 0, sizeof(*sm_p->key_a)   * sm_p->keyval_count);
273   memset(sm_p->val_a  , 0, sizeof(*sm_p->val_a)   * sm_p->keyval_count);
274   memset(sm_p->error_a, 0, sizeof(*sm_p->error_a) * sm_p->keyval_count);
275
276   /*setup key/val to retreive the mirroring mode*/
277   i=0; assert(i<keyval_count);
278   sm_p->key_a[i].buffer = mode_key;
279   sm_p->key_a[i].buffer_sz = sizeof(mode_key);
280
281   sm_p->val_a[i].buffer = &(imm_p->mirror_mode);
282   sm_p->val_a[i].buffer_sz = sizeof(imm_p->mirror_mode);
283
284
285   /*setup key/val to retreive the datahandles*/
286   i++; assert(i<keyval_count);
287   sm_p->key_a[i].buffer    = Trove_Common_Keys[METAFILE_HANDLES_KEY].key;
288   sm_p->key_a[i].buffer_sz = Trove_Common_Keys[METAFILE_HANDLES_KEY].size;
289
290   imm_p->handle_array_base = malloc(imm_p->dfile_count * sizeof(PVFS_handle));
291   if (!imm_p->handle_array_base)
292      goto error_exit;
293   sm_p->val_a[i].buffer    =  imm_p->handle_array_base;
294   sm_p->val_a[i].buffer_sz = (imm_p->dfile_count * sizeof(PVFS_handle));
295
296   /*setup key/val to retreive the number of copies*/
297   i++; assert(i<keyval_count);
298   sm_p->key_a[i].buffer    = copy_count_key;
299   sm_p->key_a[i].buffer_sz = sizeof(copy_count_key);
300
301   sm_p->val_a[i].buffer    = &(imm_p->copies);
302   sm_p->val_a[i].buffer_sz = sizeof(imm_p->copies);
303
304
305/***** don't need to get file's distriubtion information, because we are */
306/***** copying each datahandle, as is, directly into a new datahandle.   */
307/***** Distribution is only needed when you are modifying the logical    */
308/***** file, not the individual data handles.  However, we need to pro-  */
309/***** vide a distribution value for the IO request, even though it won't*/
310/***** be used.  So, instead of issuing a trove call to get the file's   */
311/***** distribution information, we will be using just the "basic" dis-  */
312/***** tribution.                                                        */
313
314   imm_p->dist = malloc(sizeof(PINT_dist));
315   if (!imm_p->dist)
316      goto error_exit;
317   memset(imm_p->dist,0,sizeof(PINT_dist));
318
319   imm_p->dist->dist_name = malloc(PVFS_DIST_BASIC_NAME_SIZE);
320   if (!imm_p->dist->dist_name)
321      goto error_exit;
322   strcpy(imm_p->dist->dist_name,PVFS_DIST_BASIC_NAME);
323   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tdistribution name:%s\n"
324                                   ,imm_p->dist->dist_name);
325
326   ret = PINT_dist_lookup(imm_p->dist);
327   if (ret)
328   {
329      gossip_lerr("Error looking up basic distribution:%d",ret);
330      js_p->error_code = ret;
331      goto error_exit;
332   }
333
334   /*retrieve key/val pairs */
335   ret = job_trove_keyval_read_list( imm_p->fs_id
336                                    ,imm_p->metadata_handle
337                                    ,sm_p->key_a
338                                    ,sm_p->val_a
339                                    ,sm_p->error_a
340                                    ,sm_p->keyval_count
341                                    ,0
342                                    ,NULL
343                                    ,smcb
344                                    ,0
345                                    ,js_p
346                                    ,&job_id
347                                    ,server_job_context
348                                    ,NULL );
349    return (ret);
350
351error_exit:
352   if (sm_p->key_a)
353      free (sm_p->key_a);
354   if (sm_p->val_a)
355      free (sm_p->val_a);
356   if (sm_p->error_a)
357      free(sm_p->error_a);
358   sm_p->key_a   = sm_p->val_a = NULL;
359   sm_p->error_a = NULL;
360
361   if (imm_p->dist && imm_p->dist->dist_name)
362      free(imm_p->dist->dist_name);
363   if (imm_p->dist)
364      free(imm_p->dist);
365   imm_p->dist = NULL;
366
367   if (js_p->error_code == 0)
368      js_p->error_code = -PVFS_ENOMEM;
369   return SM_ACTION_COMPLETE;
370}/*end action obtain_source_info*/
371
372
373
374
375
376static PINT_sm_action inspect_source_info (struct PINT_smcb *smcb
377                                          ,job_status_s *js_p)
378{   
379   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing inspect_source_info....\n");
380
381   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
382   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
383   char server_name[SERVER_NAME_MAX] = {0};
384   server_configuration_s *config = get_server_config_struct();
385   int ret = 0;
386   int i;
387
388   /*check error codes from previous trove read-list call. */
389   for (i=0; i<sm_p->keyval_count; i++)
390   {
391      /*if the mirroring mode has no entry, the mode=NO_MIRRORING, or the mode*/
392      /*is not the expected mode, then there is nothing to do.                */
393      if (sm_p->key_a[i].buffer == mode_key)
394      {
395          if (PVFS_get_errno_mapping(sm_p->error_a[i]) == ENOENT)
396          {
397                 js_p->error_code = NOTHING_TO_DO;
398                 goto error_exit;
399          }
400          imm_p->mirror_mode = *(MIRROR_MODE *)sm_p->val_a[i].buffer;
401          gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRetrieved mirroring mode is %d.\n"
402                                          ,imm_p->mirror_mode);
403          if (imm_p->mirror_mode == NO_MIRRORING ||
404              imm_p->mirror_mode != imm_p->expected_mirror_mode)
405          {
406             js_p->error_code = NOTHING_TO_DO;
407             goto error_exit;
408          }
409      }
410
411      /*if the user hasn't set the number of copies, this code will */
412      /*set a default (currently = 1).                              */
413      if (sm_p->key_a[i].buffer == copy_count_key)
414      {
415         if (PVFS_get_errno_mapping(sm_p->error_a[i]) == ENOENT)
416         {
417            gossip_lerr("User-defined number of copies not found. "
418                        "Defaulting number of copies to %d.\n"
419                       ,DEFAULT_COPIES);
420            imm_p->copies = DEFAULT_COPIES;
421            continue;
422         }
423      }
424
425      /*check for other types of errors.*/
426      if (sm_p->error_a[i])
427      {
428          gossip_lerr("Error retrieving value for '%s' : %s\n"
429                     ,(char *)sm_p->key_a[i].buffer
430                     ,strerror(PVFS_get_errno_mapping(-sm_p->error_a[i])));
431          js_p->error_code = sm_p->error_a[i];
432          goto error_exit;
433      }
434   }/*end for*/
435
436   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRetrieved # of copies:%d\n"
437                                   ,imm_p->copies);
438
439   /*If there is only one server running, then it makes no sense to create */
440   /*copies on the same server.                                            */
441   if (imm_p->io_servers_count == 1)
442   {
443       gossip_lerr("Mirroring operation is not permitted when only one "
444                   "I/O server is running.\n");
445       js_p->error_code = -PVFS_EPERM;
446       return SM_ACTION_COMPLETE;
447   }
448
449   /* We need at least (# of copies) + 1 I/O servers running in the system to */
450   /* prevent duplicate data on any one server, while not exceeding the number*/
451   /* of I/O servers in the system. If the number of copies requested by the  */
452   /* user is >= the number of I/O servers in the system, then we lower the   */
453   /* number of requested copies.  We then set the number of I/O servers      */
454   /* required to meet this request with the (new value of copies) + 1.       */
455   /* At this point, if the number of I/O servers required is less than the   */
456   /* number of servers in this file's distribution, then set the number of   */
457   /* required I/O servers to the same number of servers in this file's dis-  */
458   /* tribution.                                                              */
459   if (imm_p->copies >= imm_p->io_servers_count)
460       imm_p->copies = imm_p->io_servers_count - 1;
461
462   imm_p->io_servers_required = imm_p->copies + 1;
463
464   if (imm_p->io_servers_required < imm_p->dfile_count)
465       imm_p->io_servers_required = imm_p->dfile_count;
466
467   /*allocate space for io_servers array.  this array will contain the server */
468   /*names which will be used as the valid destination remotes for the copies.*/
469   imm_p->io_servers = malloc( imm_p->io_servers_required * sizeof(char *) );
470   if ( !imm_p->io_servers )
471   {
472       js_p->error_code = -PVFS_ENOMEM;
473       goto error_exit;
474   }
475   memset(imm_p->io_servers,0,imm_p->io_servers_required * sizeof(char *));
476   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tAllocated char *:\n");
477   for (i=0; i<imm_p->io_servers_required; i++)
478       gossip_debug(GOSSIP_MIRROR_DEBUG,"\t  io_servers[%d] : %p "
479                                        "\t &io_servers[%d] : %p\n"
480                                       ,i,imm_p->io_servers[i]
481                                       ,i,&(imm_p->io_servers[i]));
482   imm_p->num_io_servers = imm_p->io_servers_required;
483   for (i=0;i<imm_p->io_servers_required;i++)
484   {
485       imm_p->io_servers[i] = malloc( sizeof(server_name) );
486       if ( !imm_p->io_servers[i] )
487       {
488           js_p->error_code = -PVFS_ENOMEM;
489           goto error_exit;
490       }
491       memset(imm_p->io_servers[i],0,sizeof(server_name));
492   }
493   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tAllocated server_name..\n");
494   for (i=0; i<imm_p->io_servers_required; i++)
495       gossip_debug(GOSSIP_MIRROR_DEBUG,"\t  io_servers[%d] : %p "
496                                        "\t *io_servers[%d] : %s "
497                                        "\t &io_servers[%d] : %p\n"
498                                       ,i,imm_p->io_servers[i]
499                                       ,i,imm_p->io_servers[i]
500                                       ,i,&(imm_p->io_servers[i]));
501
502
503
504   /*allocate space for the local source handles*/
505   imm_p->handle_array_base_local =
506       malloc(imm_p->dfile_count * sizeof(PVFS_handle));
507   if (!imm_p->handle_array_base_local)
508   {
509      js_p->error_code = -PVFS_ENOMEM;
510      goto error_exit;
511   }
512   memset(imm_p->handle_array_base_local
513         ,0
514         ,imm_p->dfile_count * sizeof(PVFS_handle));
515   imm_p->handle_array_base_local_count = 0;
516
517
518   /*allocate space for the data handle copies array, which will hold a       */
519   /*combination of local and remote data handles used as destination handles */
520   /*for the copies. The order of this array will mimmick the order of the    */
521   /*original data file array.  Thus, handle_array_base[i]                    */
522   /*and handle_array_copies[i] will hold handles for the same server number. */
523   imm_p->handle_array_copies = malloc(imm_p->io_servers_required 
524                                       * imm_p->copies     
525                                       * sizeof(PVFS_handle));
526   if ( !imm_p->handle_array_copies )
527   {
528       js_p->error_code = -PVFS_ENOMEM;
529       goto error_exit;
530   }
531   memset(imm_p->handle_array_copies,0,imm_p->io_servers_required 
532                                       * imm_p->copies     
533                                       * sizeof(PVFS_handle));
534
535   /*allocate space for the local_io_servers array.  this array contains the */
536   /*server names for local data handles.                                    */
537   imm_p->local_io_servers = malloc(  imm_p->io_servers_required
538                                    * sizeof(char *));
539   if ( !imm_p->local_io_servers )
540   {
541       js_p->error_code = -PVFS_ENOMEM; 
542       goto error_exit;
543
544   }
545   memset(imm_p->local_io_servers
546         ,0
547         ,imm_p->io_servers_required * sizeof(char *));
548   imm_p->local_io_servers_count = 0;
549
550
551
552   /*allocate space for the remote_io_servers array.  this array contains the */
553   /*server names for remote data handles.                                    */
554   imm_p->remote_io_servers = malloc(  imm_p->io_servers_required
555                                     * sizeof(char*));
556   if ( !imm_p->remote_io_servers )
557   {
558       js_p->error_code = -PVFS_ENOMEM;
559       goto error_exit;
560   }
561   memset(imm_p->remote_io_servers
562         ,0
563         ,imm_p->io_servers_required * sizeof(char*));
564   imm_p->remote_io_servers_count = 0;
565
566   /*populate the io_servers array with server_names:                         */
567   /*Step 1:  Always start by using the server names associated with the      */
568   /*         original datahandles, keeping the order in tact.                */
569   /*Step 2:  If additional servers are needed, then tap into the list of     */
570   /*         servers running in the file system and grab those that are not  */
571   /*         currently in the io_servers list.                               */
572
573   /*Step 1*/   
574   for (i=0; i<imm_p->dfile_count; i++)
575   {
576      ret = PINT_cached_config_get_server_name( imm_p->io_servers[i],
577                                                sizeof(server_name)-1,
578                                                imm_p->handle_array_base[i],
579                                                imm_p->fs_id );
580      if (ret)
581      {
582         js_p->error_code = ret;
583         goto error_exit;
584      }
585      PVFS_handle_unparse(imm_p->handle_array_base[i],PVFS_handle_string);
586      gossip_debug(GOSSIP_MIRROR_DEBUG,"\tValue of server_name is %s "
587                                       "for handle %s\n"
588                                      ,imm_p->io_servers[i]
589                                      ,PVFS_handle_string);
590   }/*end for*/
591
592   /*Step 2*/
593   if (imm_p->io_servers_required > imm_p->dfile_count)
594   {
595      ret = get_server_names(imm_p);
596      if (ret)
597      {
598          gossip_lerr("Unable to populate io_servers list.\n");
599          js_p->error_code = ret;
600          goto error_exit;
601      }
602   }
603
604
605   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tconfig->host_id is %s.\n"
606                                   ,config->host_id);
607     
608   gossip_debug(GOSSIP_MIRROR_DEBUG,"\timm_p->io_servers_required : %d\n"
609                                   ,imm_p->io_servers_required);
610   for (i=0; i<imm_p->io_servers_required; i++)
611   {
612      char *server_name = imm_p->io_servers[i];
613      if (strncmp(server_name,config->host_id,SERVER_NAME_MAX-1) == 0)
614      {/*local*/
615         gossip_debug(GOSSIP_MIRROR_DEBUG,"\tprocessing local....\n");
616         imm_p->local_io_servers[imm_p->handle_array_copies_local_count] =
617             malloc( SERVER_NAME_MAX );
618         if ( !imm_p->local_io_servers[imm_p->handle_array_copies_local_count] )
619         {
620              js_p->error_code = -PVFS_ENOMEM;
621              goto error_exit;
622         }
623         memset(imm_p->local_io_servers[imm_p->handle_array_copies_local_count]
624               ,0
625               ,SERVER_NAME_MAX);
626         memcpy(imm_p->local_io_servers[imm_p->handle_array_copies_local_count],
627                server_name,SERVER_NAME_MAX-1);
628         if ( i < imm_p->dfile_count )
629         {
630            PVFS_handle_copy(
631                imm_p->handle_array_base_local[
632                    imm_p->handle_array_base_local_count],
633                imm_p->handle_array_base[i]);
634
635            imm_p->handle_array_base_local_count++;
636            PVFS_handle_unparse(imm_p->handle_array_base_local[imm_p->handle_array_base_local_count]
637                               ,PVFS_handle_string);
638            gossip_debug(GOSSIP_MIRROR_DEBUG,"\tlocal source handle(%d):%s\n"
639              ,imm_p->handle_array_base_local_count
640              ,PVFS_handle_string);
641         }
642
643         imm_p->handle_array_copies_local_count++;
644      }
645      else
646      {/*remote*/
647         imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count] =
648             malloc( SERVER_NAME_MAX );
649         if (!imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count])
650         {
651               js_p->error_code = -PVFS_ENOMEM;
652               goto error_exit;
653         }
654        memset(imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count]
655              ,0
656              ,SERVER_NAME_MAX);
657        memcpy(imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count]
658              ,server_name,SERVER_NAME_MAX-1);
659         imm_p->handle_array_copies_remote_count++;
660      }/*end if*/
661   }/*end for*/
662
663   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tLocal: %d\tRemote: %d\n"
664                                   ,imm_p->handle_array_copies_local_count
665                                   ,imm_p->handle_array_copies_remote_count);
666
667   imm_p->local_io_servers_count = imm_p->handle_array_copies_local_count;
668   imm_p->remote_io_servers_count = imm_p->handle_array_copies_remote_count;
669
670   for (i=0; i<imm_p->local_io_servers_count; i++)
671   {
672        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tlocal_io_servers[%d]: %s\n",i,
673          imm_p->local_io_servers[i]);
674   }
675   for (i=0; i<imm_p->remote_io_servers_count; i++)
676   {
677        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tremote_io_servers[%d]: %s\n",i,
678          imm_p->remote_io_servers[i]);
679   }
680
681
682   /*allocate and initialize space for local and remote handle arrays*/
683   if (imm_p->handle_array_copies_local_count)
684   {
685      imm_p->handle_array_copies_local =
686        malloc( imm_p->handle_array_copies_local_count *
687                imm_p->copies * sizeof(PVFS_handle));
688     if ( !imm_p->handle_array_copies_local )
689      {
690         js_p->error_code = -PVFS_ENOMEM;
691         goto error_exit;
692      }
693   }
694   if (imm_p->handle_array_copies_remote_count)
695   {
696      imm_p->handle_array_copies_remote =
697        malloc( imm_p->handle_array_copies_remote_count *
698                imm_p->copies * sizeof(PVFS_handle));
699      if ( !imm_p->handle_array_copies_remote )
700      {
701         js_p->error_code = -PVFS_ENOMEM;
702         goto error_exit;
703      }
704     
705   }/*end if*/
706
707   memset(imm_p->handle_array_copies_local
708         ,0
709         ,imm_p->handle_array_copies_local_count  *
710          imm_p->copies * sizeof(PVFS_handle));
711   memset(imm_p->handle_array_copies_remote
712         ,0
713         ,imm_p->handle_array_copies_remote_count *
714          imm_p->copies * sizeof(PVFS_handle));
715
716error_exit:
717   /*all other memory will be freed in the "cleanup" action.*/
718   free(sm_p->key_a);
719   free(sm_p->val_a);
720   free(sm_p->error_a);
721   sm_p->key_a = sm_p->val_a = NULL;
722   sm_p->error_a = NULL;
723
724   return SM_ACTION_COMPLETE;
725}/*end action inspect_source_info*/
726
727
728
729/*We must get the bstream size for any datahandles that reside on this server.*/
730/*This scenario occurs when a metadata and i/o server are one of the same or  */
731/*a metadata server and i/o server are running on the same machine.  I think  */
732/*this is outdated now, but I check for it anyway.                            */
733static PINT_sm_action obtain_local_handle_sizes(struct PINT_smcb *smcb
734                                               ,job_status_s *js_p)
735{   
736   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing obtain_local_handle_sizes....\n");
737
738   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
739   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
740   job_id_t job_id;
741   int ret = 0;
742
743   js_p->error_code = 0;
744
745
746   /*Do we have any local handles?*/
747   if (imm_p->handle_array_copies_local_count == 0)
748       return SM_ACTION_COMPLETE;
749
750   sm_p->error_a = malloc(imm_p->handle_array_base_local_count *
751                           sizeof(PVFS_error) );
752   if (!sm_p->error_a)
753   {
754      js_p->error_code = -PVFS_ENOMEM;
755      goto error_exit;
756   }
757
758   imm_p->ds_attr_a = malloc(imm_p->handle_array_base_local_count *
759                             sizeof(PVFS_ds_attributes));
760   if (!imm_p->ds_attr_a)
761   {
762      js_p->error_code = -PVFS_ENOMEM;
763      goto error_exit;
764   }
765   
766   ret = job_trove_dspace_getattr_list(imm_p->fs_id
767                                      ,imm_p->handle_array_base_local_count
768                                      ,imm_p->handle_array_base_local
769                                      ,smcb
770                                      ,sm_p->error_a
771                                      ,imm_p->ds_attr_a
772                                      ,0
773                                      ,js_p
774                                      ,&job_id
775                                      ,server_job_context
776                                      ,NULL);
777   return ret;
778
779
780error_exit:
781   if (sm_p->error_a)
782      free(sm_p->error_a);
783   sm_p->error_a = NULL;
784
785   return SM_ACTION_COMPLETE;
786}/*end action obtain_local_handle_sizes*/
787
788
789
790static PINT_sm_action inspect_local_handle_sizes(struct PINT_smcb *smcb
791                                                ,job_status_s *js_p)
792{   
793   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing inspect_local_handle_sizes..\n");
794
795   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
796   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
797   int i,j;
798
799
800   /*Do we have any local handles?*/
801   if (imm_p->handle_array_copies_local_count == 0)
802       return SM_ACTION_COMPLETE;
803
804   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tchecking for errors....\n");
805   /*check for errors*/
806   for (i=0; i<imm_p->handle_array_base_local_count; i++)
807   {
808       if (sm_p->error_a[i])
809       {
810          js_p->error_code = sm_p->error_a[i];
811          free(sm_p->error_a);
812          sm_p->error_a = NULL;
813          return SM_ACTION_COMPLETE;
814       }
815   }/*end for*/
816   js_p->error_code = 0;
817
818   imm_p->bstream_array_base_local = malloc(imm_p->dfile_count *
819                                            sizeof(PVFS_size));
820   if (!imm_p->bstream_array_base_local)
821   {
822       js_p->error_code = -PVFS_ENOMEM;
823       free(sm_p->error_a);
824       sm_p->error_a = NULL;
825       return SM_ACTION_COMPLETE;
826   }
827   memset(imm_p->bstream_array_base_local
828         ,0
829         ,imm_p->dfile_count * sizeof(PVFS_size));
830
831   gossip_debug(GOSSIP_MIRROR_DEBUG
832               ,"\tpopulating bstream_array_base_local...\n");
833
834   gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle_array_base_local_count:%d\n"
835                                   ,imm_p->handle_array_base_local_count);
836   /*populate bstream_array_base_local*/
837   for (i=0; i<imm_p->handle_array_base_local_count; i++)
838   {
839       for (j=0; j<imm_p->dfile_count; j++)
840       {
841            gossip_debug(GOSSIP_MIRROR_DEBUG,"\tlocal handle(%d):%llu"
842                                             "\tbase handle(%d):%llu\n"
843               ,i,llu(imm_p->handle_array_base_local[i])
844               ,j,llu(imm_p->handle_array_base[j]) );
845            if (imm_p->handle_array_base_local[i] == imm_p->handle_array_base[j])
846            {
847              imm_p->bstream_array_base_local[j] =
848              imm_p->ds_attr_a[i].u.datafile.b_size;
849              gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle:%llu\tsize:%d\n"
850                  ,llu(imm_p->handle_array_base_local[i])
851                  ,(int)imm_p->bstream_array_base_local[j] );
852            }
853       }/*end for*/
854   }/*end for*/
855
856   free(sm_p->error_a);
857   sm_p->error_a = NULL;
858
859   return SM_ACTION_COMPLETE;
860}/*end action inspect_local_handle_sizes*/
861
862
863
864static PINT_sm_action create_local_datahandles (struct PINT_smcb *smcb
865                                               ,job_status_s *js_p)
866{   
867   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing create_local_datahandles....\n");
868
869   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
870   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
871   job_id_t job_id;
872   int ret = 0;
873   int i;
874
875   PVFS_handle_extent_array data_handle_ext_array;
876   server_configuration_s *config = get_server_config_struct();
877
878
879   js_p->error_code = 0;
880
881   /*Do we have any local handles?*/
882   if (imm_p->handle_array_copies_local_count == 0)
883       return SM_ACTION_COMPLETE;
884
885
886   gossip_debug(GOSSIP_MIRROR_DEBUG,"Target handle: %llu\tTarget FS ID: %d\n"
887                                   ,llu(imm_p->metadata_handle),imm_p->fs_id);
888   gossip_debug(GOSSIP_MIRROR_DEBUG,"dfile count: %d\n",imm_p->dfile_count);
889   gossip_debug(GOSSIP_MIRROR_DEBUG,"stuffed size: %d\n"
890                                ,sm_p->target_object_attr->u.meta.stuffed_size);
891   gossip_debug(GOSSIP_MIRROR_DEBUG,"hint.flags: %llu\n"
892                             ,llu(sm_p->target_object_attr->u.meta.hint.flags));
893   gossip_debug(GOSSIP_MIRROR_DEBUG,"dfile array P: %p\n"
894                                 ,sm_p->target_object_attr->u.meta.dfile_array);
895
896   /*find local IO extent array for this file system for metadata host*/
897   ret = PINT_cached_config_get_server( imm_p->fs_id
898                                       ,config->host_id
899                                       ,PINT_SERVER_TYPE_IO
900                                       ,&data_handle_ext_array );
901   if (ret)
902   {
903      js_p->error_code = ret;
904      return SM_ACTION_COMPLETE;
905   }
906   for (i=0;i<data_handle_ext_array.extent_count;i++)
907   {
908       gossip_debug(GOSSIP_MIRROR_DEBUG,"Extent Range %d:\tfirst  %llu"
909                                        "\tlast  %llu\n"
910                             ,i,llu(data_handle_ext_array.extent_array[i].first)
911                            ,llu(data_handle_ext_array.extent_array[i].last)  );
912   }
913
914   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of copies: %d \tlocation:%p\n"
915                                   ,imm_p->copies,&imm_p->copies);
916
917
918   /*create local datahandles - will be used as destination handles for copies*/
919   ret = job_trove_dspace_create_list( imm_p->fs_id
920                                      ,imm_p->handle_array_copies_local
921                                      ,imm_p->handle_array_copies_local_count *
922                                       imm_p->copies
923                                      ,PVFS_TYPE_DATAFILE
924                                      ,NULL
925                                      ,TROVE_SYNC
926                                      ,smcb
927                                      ,0
928                                      ,js_p
929                                      ,&job_id
930                                      ,server_job_context
931                                      ,NULL );
932   return ret;
933} /*end action create_local_datahandles*/
934
935
936
937
938static PINT_sm_action create_remote_datahandles (struct PINT_smcb *smcb
939                                                ,job_status_s *js_p)
940{   
941   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing create_remote_datahandles...\n");
942   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
943   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
944   int ret = 0,i;
945   job_id_t job_id;
946
947   js_p->error_code = 0;
948
949
950   if (imm_p->handle_array_copies_remote_count == 0)
951       return SM_ACTION_COMPLETE;
952
953   int rows = imm_p->copies;
954   int cols = imm_p->handle_array_copies_remote_count;
955
956   imm_p->my_remote_servers = malloc(sizeof(char *) * rows * cols);
957   if (!imm_p->my_remote_servers)
958   {
959       gossip_lerr("Error allocating imm_p->my_remote_servers.\n");
960       js_p->error_code = -PVFS_ENOMEM;
961       return SM_ACTION_COMPLETE;
962   }
963   memset(imm_p->my_remote_servers,0,sizeof(char *) * rows * cols);
964
965   /*setup my_remote_servers[copy,remote#] = remote server name.  This will */
966   /*allow job_precreate_pool to return handle_array_copies_remote where    */
967   /*[copy,remote#] = remote handle.  We end up with a list of remotes for  */
968   /*each copy in original distribution order.                              */
969   for (i=0; i<(rows*cols); i++)
970   {
971       imm_p->my_remote_servers[i] = imm_p->remote_io_servers[i%cols];
972       gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tremote_io_servers[%d]:%s "
973                                        "my_remote_servers[%d]:%s\n"
974                                       ,(i%cols)
975                                       ,imm_p->remote_io_servers[i%cols]
976                                       ,i
977                                       ,imm_p->my_remote_servers[i]);
978   }
979
980  ret = job_precreate_pool_get_handles(imm_p->fs_id
981                                       ,rows*cols,
982                                       PVFS_TYPE_DATAFILE,
983                                       (const char **)imm_p->my_remote_servers
984                                       ,imm_p->handle_array_copies_remote
985                                       ,0
986                                       ,smcb
987                                       ,0
988                                       ,js_p
989                                       ,&job_id
990                                       ,server_job_context
991                                       ,NULL);
992   return ret;
993}/*end action create_remote_datahandles*/
994
995
996
997
998static PINT_sm_action setup_datahandle_copies (struct PINT_smcb *smcb
999                                              ,job_status_s *js_p)
1000{
1001   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing setup_datahandle_copies...\n");
1002   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
1003   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
1004   server_configuration_s *config = get_server_config_struct();
1005   int i,j,k;
1006   
1007
1008   js_p->error_code = 0;
1009
1010   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRemote destination handles:\n");
1011   int rows = imm_p->copies;
1012   int cols = imm_p->handle_array_copies_remote_count;
1013   for (i=0; i<(rows*cols); i++)
1014   {
1015       gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tRemote handle(%d):%llu\n"
1016                                    ,i
1017                                    ,llu(imm_p->handle_array_copies_remote[i]));
1018   }
1019
1020   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tLocal destination handles:\n");
1021   cols = imm_p->handle_array_copies_local_count;
1022   for (i=0; i<(rows*cols); i++)
1023   {
1024       gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tLocal handle(%d):%llu\n"
1025                                    ,i
1026                                    ,llu(imm_p->handle_array_copies_local[i]));
1027   }
1028
1029   for (i=0,j=0,k=0; i<(imm_p->io_servers_required * imm_p->copies); i++)
1030   {
1031      if ( strncmp(imm_p->io_servers[i%imm_p->io_servers_required]
1032                 ,config->host_id,SERVER_NAME_MAX-1) == 0 )
1033      {/*local*/
1034          memcpy(&imm_p->handle_array_copies[i]
1035                ,&imm_p->handle_array_copies_local[j],sizeof(PVFS_handle));
1036          j++;
1037      }
1038      else
1039      {/*remote*/
1040          memcpy(&imm_p->handle_array_copies[i]
1041                ,&imm_p->handle_array_copies_remote[k],sizeof(PVFS_handle));
1042          k++;
1043      }       
1044   }/*end for*/
1045
1046   for (i=0; i<(imm_p->io_servers_required * imm_p->copies); i++)
1047       gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle_array_copies[%d]:  %llu.\n"
1048                                       ,i
1049                                       ,llu(imm_p->handle_array_copies[i]));
1050   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tnumber of io servers required: %d\n"
1051                                   ,imm_p->io_servers_required);
1052
1053   /*create and initialize the writes_completed array*/
1054   imm_p->writes_completed = malloc(sizeof(struct writes_completed_s) * imm_p->dfile_count
1055                                                                      * imm_p->copies);
1056   if (!imm_p->writes_completed)
1057   {
1058      gossip_lerr("Unable to allocate imm_p->writes_completed.\n");
1059      js_p->error_code = -PVFS_ENOMEM;
1060      return SM_ACTION_COMPLETE;
1061   }
1062   for (i=0; i<(imm_p->dfile_count * imm_p->copies); i++)
1063   {
1064       imm_p->writes_completed[i].status = STATUS_INIT;
1065       PVFS_handle_clear(imm_p->writes_completed[i].handle);
1066   }
1067   //memset(imm_p->writes_completed,UINT64_HIGH,sizeof(PVFS_handle)
1068   //                                         * imm_p->dfile_count
1069   //                                         * imm_p->copies);
1070
1071   /*the retry count is used to monitor how many times we retry a write. */
1072   /*this value is incremented in the check_for_retries state.           */
1073   imm_p->retry_count = 0;
1074
1075   return SM_ACTION_COMPLETE;
1076}/*end action setup_datahandle_copies*/
1077
1078
1079static PINT_sm_action copy_data (struct PINT_smcb *smcb, job_status_s *js_p)
1080{   
1081   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing copy_data....\n");
1082   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
1083   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
1084   server_configuration_s *config = get_server_config_struct();
1085   filesystem_configuration_s *fs =
1086                                    PINT_config_find_fs_id(config,imm_p->fs_id);
1087   int ret = 0;
1088   int src,row,cols,i,index,wc;
1089
1090   char PVFS_handle_string[PVFS_HANDLE_STRING_LEN];
1091
1092   /*this variable helps to understand the logic better.  it is a redeclara- */
1093   /*tion of the one dimensional imm_p->handle_array_copies and can be ac-   */
1094   /*cessed as handle_array_copies[copies,dfile_count].                      */
1095   PVFS_handle *handle_array_copies[imm_p->copies];
1096   memset(handle_array_copies,0,sizeof(PVFS_handle) * imm_p->copies);
1097   ONE_DIM_TO_TWO_DIMS(imm_p->handle_array_copies
1098                      ,handle_array_copies
1099                      ,imm_p->copies,imm_p->io_servers_required
1100                      ,PVFS_handle);
1101
1102   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tone dim to two dims:\n");
1103   for (row=0; row<imm_p->copies; row++)
1104   {
1105       for (cols=0; cols<imm_p->io_servers_required; cols++)
1106       {
1107        memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN);
1108        PVFS_handle_unparse(handle_array_copies[row][cols],PVFS_handle_string);
1109        gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\thandle_array_copies[%d][%d] : "
1110                                         "%sn"
1111                                        ,row,cols
1112                                        ,PVFS_handle_string);
1113       }
1114   }
1115
1116   js_p->error_code = 0;
1117
1118   /*for each source handle[src], create a MIRROR request containing a set of */
1119   /*destination handles.                                                     */
1120   for (src=0; src<imm_p->dfile_count; src++)
1121   {
1122       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tWorking on src #%d\n",src);
1123
1124       /* writes_completed indicates the status of each copy for each source: */
1125       /* 0 ==> completed, 1 ==> incomplete, STATUS_INIT ==> initial state.   */
1126       /* If incomplete, the value stored is the handle is the destination    */
1127       /* handle.                                                             */
1128       for (row=src,cols=0; cols<imm_p->copies; cols++)
1129       {
1130           index = (imm_p->copies * row) + cols;
1131
1132           if ( imm_p->writes_completed[index].status == 1 ||
1133                imm_p->writes_completed[index].status == STATUS_INIT    )
1134               
1135           //if (   imm_p->writes_completed[index] > 0
1136           //        || imm_p->writes_completed[index] == UINT64_HIGH)
1137
1138              break;
1139       }
1140
1141       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tValue of cols is %d\n",cols);
1142       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tValue of imm_p->copies is %d.\n"
1143                                       ,imm_p->copies);
1144 
1145       /*if all copies for this source are zero ==> process next source.*/
1146       if (cols==imm_p->copies)
1147       {
1148          gossip_debug(GOSSIP_MIRROR_DEBUG,"\tThis source[%d] has all "
1149                                           "completed writes.\n"
1150                                          ,src);
1151          continue;
1152       }
1153
1154       struct PVFS_server_req *req = malloc(sizeof(struct PVFS_server_req));
1155       if (!req)
1156       {
1157           gossip_lerr("Unable to allocate PVFS_server_req.\n");
1158           js_p->error_code = -PVFS_ENOMEM;
1159           return SM_ACTION_COMPLETE;
1160       }
1161       memset(req,0,sizeof(struct PVFS_server_req));
1162
1163       req->u.mirror.dst_handle = malloc(sizeof(PVFS_handle) * imm_p->copies);
1164       if ( !req->u.mirror.dst_handle)
1165       {
1166            gossip_lerr("Unable to allocate mirror.dst_handle.\n");
1167            js_p->error_code = -PVFS_ENOMEM;
1168            return SM_ACTION_COMPLETE;
1169       }
1170       memset(req->u.mirror.dst_handle,0,sizeof(PVFS_handle) * imm_p->copies);
1171
1172       /*index into the writes_completed array for each destination handle*/
1173       req->u.mirror.wcIndex = malloc(sizeof(uint32_t) * imm_p->copies);
1174       if ( !req->u.mirror.wcIndex )
1175       {
1176            gossip_lerr("Unable to allocate mirror.wcIndex.\n");
1177            js_p->error_code = -PVFS_ENOMEM;
1178            return SM_ACTION_COMPLETE;
1179       }
1180       memset(req->u.mirror.wcIndex,0,sizeof(uint32_t) * imm_p->copies);
1181 
1182       req->op = PVFS_SERV_MIRROR;
1183       req->credentials = sm_p->req->credentials;
1184
1185       PVFS_handle_copy(req->u.mirror.src_handle,
1186                        imm_p->handle_array_base[src]);
1187
1188       
1189       /* In the initial state or when ALL writes have failed, get destination*/
1190       /* handles from handle_array_copies array.  Otherwise, use the handles */
1191       /* stored in the writes_completed array.                               */
1192       index = imm_p->copies*src; /*first copy for this source*/
1193       
1194       //if (imm_p->writes_completed[index] == UINT64_HIGH)
1195        if ( imm_p->writes_completed[index].status == STATUS_INIT )
1196       {
1197          /*handle_array_copies[copy,server#] is accessed as a two-dimensional*/
1198          /*array where a row represents a copy and columns represent the     */
1199          /*destination handles,in order of the original file distribution. We*/
1200          /*map the source handle[i], which is also in distribution order,    */
1201          /*to handle_arrray_copies[0,i+1],[1,i+2],..,[n-1,(i+y)-1], where n  */
1202          /*is the number of copies and y is the number of handles in one copy*/
1203          for (wc=0,row=0,cols=(src+1)%imm_p->io_servers_required;
1204               row < imm_p->copies;
1205               wc++,row++,cols=(cols+1)%imm_p->io_servers_required)
1206          {
1207            PVFS_handle_copy(req->u.mirror.dst_handle[row],
1208                             handle_array_copies[row][cols]);
1209            req->u.mirror.wcIndex[row]     = index + wc;
1210            req->u.mirror.dst_count++;
1211          }
1212       } else { /*status == 1*/
1213          for (row=src,cols=0,i=0; cols<imm_p->copies; cols++,i++)
1214          {
1215              index = (imm_p->copies*row) + cols;
1216              //if (imm_p->writes_completed[index] > 0)
1217              //if ( !PVFS_handle_is_null(imm_p->writes_completed[index]) )
1218                if ( imm_p->writes_completed[index].status > 0 )
1219              {
1220                 PVFS_handle_copy(req->u.mirror.dst_handle[i],
1221                                  imm_p->writes_completed[index].handle);
1222                 req->u.mirror.wcIndex[i] = index;
1223                 req->u.mirror.dst_count++;
1224              }
1225          }
1226       }
1227       req->u.mirror.fs_id         = imm_p->fs_id;
1228       req->u.mirror.dist          = imm_p->dist;
1229       req->u.mirror.src_server_nr = src;
1230       req->u.mirror.flow_type     = fs->flowproto;
1231       req->u.mirror.encoding      = fs->encoding;
1232
1233       memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN);
1234       PVFS_handle_unparse(req->u.mirror.src_handle,PVFS_handle_string);
1235       gossip_debug(GOSSIP_MIRROR_DEBUG,"\treq->: src:%s\tfs_id:%d"
1236                                        "\tdist name:%s\tsrc server_nr:%d\n"
1237                                       ,PVFS_handle_string
1238                                       ,req->u.mirror.fs_id
1239                                       ,req->u.mirror.dist->dist_name
1240                                       ,req->u.mirror.src_server_nr );
1241       for (i=0; i<req->u.mirror.dst_count; i++)
1242       {
1243           memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN);
1244           PVFS_handle_unparse(req->u.mirror.dst_handle[i],PVFS_handle_string);
1245           gossip_debug(GOSSIP_MIRROR_DEBUG,"\treq->dst_handle[%d] : %s\n"
1246                                           ,i
1247                                           ,PVFS_handle_string);
1248       }
1249       struct PINT_server_op *mirror_op =
1250                              malloc(sizeof(struct PINT_server_op));
1251       if (!mirror_op)
1252       {
1253           gossip_lerr("Error allocating mirror_op");
1254           js_p->error_code = -PVFS_ENOMEM;
1255           return SM_ACTION_COMPLETE;
1256       }
1257       memset(mirror_op,0,sizeof(struct PINT_server_op));
1258
1259       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tabout to allocate mirror_op...\n");
1260
1261       if (imm_p->bstream_array_base_local)
1262          req->u.mirror.bsize = imm_p->bstream_array_base_local[src];
1263       mirror_op->req = req;
1264       mirror_op->op  = req->op;
1265       mirror_op->addr = sm_p->addr;/*get addr for this server*/
1266
1267       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op->req(%p)\n"
1268                                       ,mirror_op->req);
1269
1270       if ( strncmp(imm_p->io_servers[src]
1271                   ,config->host_id
1272                   ,SERVER_NAME_MAX-1) == 0 )
1273       {
1274
1275           gossip_debug(GOSSIP_MIRROR_DEBUG,"Above SRC is local.\n");
1276           PINT_sm_push_frame(smcb, LOCAL_SRC, mirror_op);
1277       }
1278       else
1279       {
1280          /*setup msgpairarray call.  This msgpair represents a connection */
1281          /*between the meta server and a remote IO server.  The request   */
1282          /*for the remote IO server is PVFS_SERV_MIRROR, which will read  */
1283          /*data residing on that server and write it to a destination     */
1284          /*handle specified in the request.  The response returned from   */
1285          /*this msgpair will indicate if the copy was successful.         */
1286          gossip_debug(GOSSIP_MIRROR_DEBUG,"Above SRC is remote.\n");
1287
1288          PINT_sm_msgarray_op *msgarray_op = &(mirror_op->msgarray_op);
1289
1290          memset(msgarray_op,0,sizeof(PINT_sm_msgarray_op));
1291          msgarray_op->msgarray = &msgarray_op->msgpair;
1292          msgarray_op->count = 1;
1293          PINT_sm_msgpair_state *msg_p = &msgarray_op->msgpair;
1294
1295          msg_p->req        = *req;
1296          msg_p->fs_id      = req->u.mirror.fs_id;
1297          PVFS_handle_copy(msg_p->handle, req->u.mirror.src_handle);
1298          msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
1299          msg_p->comp_fn    = mirror_comp_fn;
1300
1301          /*setup msgarray parameters*/
1302          PINT_serv_init_msgarray_params(mirror_op,req->u.mirror.fs_id);
1303
1304          /*determine the BMI svr address for the source handle*/
1305          ret = PINT_cached_config_map_to_server(&msg_p->svr_addr
1306                                                ,msg_p->handle
1307                                                ,msg_p->fs_id );
1308          if (ret)
1309          {
1310              gossip_err("Failed to map address\n");
1311              js_p->error_code = -1;
1312              return SM_ACTION_COMPLETE;
1313          }
1314
1315          memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN);
1316          PVFS_handle_unparse(msg_p->handle,PVFS_handle_string);
1317          gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmsg_p->req.op:%d"
1318                                           "\tmsg_p->fs_id:%d"
1319                                           "\tmsg_p->handle:%s\n"
1320                                           ,msg_p->req.op
1321                                           ,msg_p->fs_id
1322                                           ,PVFS_handle_string );
1323
1324          PINT_sm_push_frame(smcb, REMOTE_SRC, mirror_op);
1325       }
1326   }/*end for (src)*/
1327
1328
1329   return SM_ACTION_COMPLETE;
1330}/*end action copy_data*/
1331
1332
1333
1334
1335static PINT_sm_action check_copy_results (struct PINT_smcb *smcb
1336                                         ,job_status_s *js_p)
1337{   
1338   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing check_copy_results....\n");
1339   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
1340   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
1341   int task_id, error_code, remaining, i, j, index;
1342   struct PINT_server_op       *mirror_op = NULL;
1343   struct PVFS_servresp_mirror *respmir   = NULL;
1344   struct PVFS_servreq_mirror  *reqmir    = NULL;   
1345   char PVFS_handle_string[PVFS_HANDLE_STRING_LEN] = {0};
1346
1347   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjs_p->error_code:%d\n"
1348                                   ,js_p->error_code);
1349   js_p->error_code = 0;
1350
1351   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsm_p->op:%d\n",sm_p->op);
1352   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsmcb->base_frame:%d"
1353                                    "\tsmcb->frame_count:%d\n"
1354                                   ,smcb->base_frame,smcb->frame_count);
1355
1356   /*the pjmp should have pushed at least one frame*/
1357   assert(smcb->frame_count > (smcb->base_frame+1));
1358
1359  do {
1360       mirror_op=PINT_sm_pop_frame(smcb, &task_id, &error_code, &remaining);
1361       respmir = &(mirror_op->resp.u.mirror);
1362       reqmir  = &(mirror_op->req->u.mirror);
1363
1364       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op->op:%d"
1365                                        "\ttask_id:%d"
1366                                        "\terror_code:%d(%0x)"
1367                                        "\tremaining:%d\n"
1368                                       ,mirror_op->op
1369                                       ,task_id
1370                                       ,error_code,error_code
1371                                       ,remaining);
1372       memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN);
1373       PVFS_handle_unparse(respmir->src_handle,PVFS_handle_string);
1374       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tresp.src_handle:%s "
1375                                        "\tresp.src_server_nr:%d\n"
1376                                       ,PVFS_handle_string
1377                                       ,respmir->src_server_nr );
1378       for (i=0; i<respmir->dst_count; i++)
1379       {
1380           gossip_debug(GOSSIP_MIRROR_DEBUG
1381                                ,"\t\tbytes_written[%d]:%d\n"
1382                                 "\t\twrite_status_codde[%d]:%d\n"
1383                                ,i
1384                                ,respmir->bytes_written[i]
1385                                ,i
1386                                ,respmir->write_status_code[i]);
1387       }
1388
1389       /*if error_code != 0, then NONE of the writes requested in this msgpair*/
1390       /*array executed, so we do not need to check the individual status     */
1391       /*codes.                                                               */
1392       if (error_code)
1393       {
1394           js_p->error_code = error_code; /*respmir has no valid data in it.*/
1395       } else {
1396           /*check the write status for each destination associated with this */
1397           /*particular source handle.                                        */
1398           for (i=0; i<respmir->dst_count; i++)
1399           {
1400              if (js_p->error_code)
1401              {
1402                  gossip_debug(GOSSIP_MIRROR_DEBUG,"\tStatus already "
1403                                                   "established:%d(0x%0x)\n"
1404                                                  ,js_p->error_code
1405                                                  ,js_p->error_code);
1406              }
1407              else if (respmir->write_status_code[i])
1408              {
1409                  gossip_debug(GOSSIP_MIRROR_DEBUG,"\tStatus came from error_"
1410                                                   "code %d(0x%0x)\n"
1411                                                  ,error_code
1412                                                  ,error_code);
1413                  js_p->error_code = respmir->write_status_code[i];
1414              }
1415              else
1416              {
1417                  gossip_debug(GOSSIP_MIRROR_DEBUG,"\tStatus is still zero(%d)"
1418                                                   "\n"
1419                                                  ,js_p->error_code);
1420              }
1421           }/*end for*/
1422       }/*end if*/
1423
1424
1425       for (i=0; i<reqmir->dst_count; i++)
1426       {
1427           index = reqmir->wcIndex[i];
1428           if (   error_code == 0  /*this will short circuit if false*/
1429               && respmir->write_status_code[i] == 0)
1430           {
1431               //imm_p->writes_completed[index] = 0;
1432                imm_p->writes_completed[index].status = 0;
1433                PVFS_handle_clear(imm_p->writes_completed[index].handle);
1434           } else if (error_code == 0)
1435           {
1436                imm_p->writes_completed[index].status = 1;
1437                PVFS_handle_copy(imm_p->writes_completed[index].handle
1438                                ,reqmir->dst_handle[i]);
1439           } else
1440           {
1441               //imm_p->writes_completed[index] = UINT64_HIGH;
1442                imm_p->writes_completed[index].status = STATUS_INIT;
1443                PVFS_handle_clear(imm_p->writes_completed[index].handle);
1444           }
1445       }
1446
1447       switch(task_id)
1448       {
1449          case LOCAL_SRC:
1450          {
1451             gossip_debug(GOSSIP_MIRROR_DEBUG,
1452                                            "\tReturning from LOCAL call...\n");
1453             break;
1454          }
1455          case REMOTE_SRC:
1456          {
1457             /*the destory can be moved into cleanup_msgpairarray, if none of*/
1458             /*its values are needed in this function.                       */
1459             PINT_msgpairarray_destroy(&(mirror_op->msgarray_op));
1460             gossip_debug(GOSSIP_MIRROR_DEBUG,
1461                                            "\tReturning from REMOTE call .\n");
1462             break;
1463          }
1464       }/*end switch*/
1465
1466       /*cleanup request/response allocations for mirror request*/
1467       if (reqmir->dst_handle)
1468           free(reqmir->dst_handle);
1469       if (reqmir->wcIndex)
1470           free(reqmir->wcIndex);
1471       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op->req(%p)\n"
1472                                       ,mirror_op->req);
1473       free(mirror_op->req);
1474
1475       if (respmir->bytes_written)
1476          free(respmir->bytes_written);
1477       if (respmir->write_status_code)
1478          free(respmir->write_status_code);
1479
1480       free(mirror_op);
1481  } while (remaining > (smcb->base_frame+1));
1482
1483   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tfinal value of js_p->error_code:%d(%0x)\n"
1484                                   ,js_p->error_code, js_p->error_code);
1485
1486   /*if one of the writes failed, js_p->error_code will contain an error.*/
1487
1488   gossip_debug(GOSSIP_MIRROR_DEBUG,"\twrites_completed array[src,server#]:\n");
1489   for (i=0; i<imm_p->dfile_count; i++)
1490   {
1491     for (j=0; j<imm_p->copies; j++)
1492     {
1493       index = (imm_p->dfile_count * i) + j;
1494       char PVFS_handle_string[PVFS_HANDLE_STRING_LEN];
1495       PVFS_handle_unparse(imm_p->writes_completed[index].handle,PVFS_handle_string);
1496       gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\t[%d][%d]:%s\n"
1497                                       ,i,j
1498                                       ,PVFS_handle_string);
1499       //gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\t[%d][%d]:%llu\n"
1500       //                                ,i,j
1501       //                                ,llu(imm_p->writes_completed[index]));
1502     }
1503   }
1504
1505   return SM_ACTION_COMPLETE;
1506}/*end action check_copy_results*/
1507
1508
1509static PINT_sm_action check_for_retries (struct PINT_smcb *smcb
1510                                        ,job_status_s *js_p)
1511{   
1512   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing check_for_retries....\n");
1513   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
1514   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
1515   int i;
1516
1517   imm_p->retry_count++;
1518
1519   /*Have we hit the retry limit?*/
1520   if (imm_p->retry_count >= WRITE_RETRY_LIMIT)
1521   {
1522       js_p->error_code = 0;
1523       return SM_ACTION_COMPLETE;
1524   }
1525
1526   /*Are there any writes to retry?*/
1527   for (i=0; i<(imm_p->dfile_count * imm_p->copies); i++)
1528   {
1529       PVFS_handle_unparse(imm_p->writes_completed[i].handle,PVFS_handle_string);
1530       gossip_debug(GOSSIP_MIRROR_DEBUG,"\twrites_complete[%d]:%s\n"
1531                                       ,i,PVFS_handle_string);
1532       //if (imm_p->writes_completed[i] != 0)
1533        if ( !PVFS_handle_is_null(imm_p->writes_completed[i].handle) )
1534       {
1535          js_p->error_code = RETRY;
1536          return SM_ACTION_COMPLETE;
1537       }
1538   }
1539   
1540   js_p->error_code = 0;
1541   return SM_ACTION_COMPLETE;
1542}/*end state check_for_retries*/
1543
1544
1545
1546static PINT_sm_action store_mirror_info (struct PINT_smcb *smcb
1547                                        ,job_status_s *js_p)
1548{   
1549   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing store_mirror_info....\n");
1550   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
1551   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
1552   PVFS_handle *reorg_handles = NULL;
1553   int  key_count = 3;
1554   int ret = 0,i,j;
1555   job_id_t job_id;
1556
1557   js_p->error_code = 0;
1558
1559   /*put copy handles in proper distribution order*/
1560   reorg_handles = reorganize_copy_handles(imm_p);
1561   if (!reorg_handles)
1562   {
1563      gossip_lerr("Unable to create reorg_handles array.\n");
1564      js_p->error_code = -PVFS_ENOMEM;
1565      return SM_ACTION_COMPLETE;
1566   }
1567
1568   /*setup key/val pairs*/
1569   sm_p->keyval_count = key_count;
1570   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of copies:%d \tlocation:%p\n"
1571                                   ,imm_p->copies,&imm_p->copies);
1572
1573   sm_p->key_a = malloc(sizeof(PVFS_ds_keyval) * sm_p->keyval_count);
1574   sm_p->val_a = malloc(sizeof(PVFS_ds_keyval) * sm_p->keyval_count);
1575
1576   if (!sm_p->key_a || !sm_p->val_a)
1577      goto error_exit;
1578
1579   memset(sm_p->key_a,0,sizeof(PVFS_ds_keyval) * sm_p->keyval_count);
1580   memset(sm_p->val_a,0,sizeof(PVFS_ds_keyval) * sm_p->keyval_count);
1581
1582   /*setup user.pvfs2.mirror.handles*/
1583   i=0; assert(i<key_count);
1584   sm_p->key_a[i].buffer = malloc(sizeof(handle_key));
1585   if (!sm_p->key_a[i].buffer)
1586      goto error_exit;
1587   strcpy(sm_p->key_a[i].buffer,handle_key);
1588   sm_p->key_a[i].buffer_sz = sizeof(handle_key);
1589
1590   sm_p->val_a[i].buffer = reorg_handles;
1591   sm_p->val_a[i].buffer_sz = sizeof(PVFS_handle) * imm_p->dfile_count
1592                                                  * imm_p->copies;
1593
1594   /*setup user.pvfs2.mirror.copies*/
1595   i++; assert(i<key_count);
1596   sm_p->key_a[i].buffer = malloc(sizeof(copy_count_key));
1597   if (!sm_p->key_a[i].buffer)
1598      goto error_exit;
1599   strcpy(sm_p->key_a[i].buffer,copy_count_key);
1600   sm_p->key_a[i].buffer_sz = sizeof(copy_count_key);
1601
1602   sm_p->val_a[i].buffer = malloc(sizeof(imm_p->copies));
1603   if (!sm_p->val_a[i].buffer)
1604      goto error_exit;
1605   sm_p->val_a[i].buffer_sz = sizeof(imm_p->copies);
1606   memcpy(sm_p->val_a[i].buffer,&(imm_p->copies),sm_p->val_a[i].buffer_sz);
1607
1608   /*setup user.pvfs2.mirror.status*/
1609   i++; assert(i<key_count);
1610   sm_p->key_a[i].buffer = malloc(sizeof(status_key));
1611   if (!sm_p->key_a[i].buffer)
1612      goto error_exit;
1613   strcpy(sm_p->key_a[i].buffer,status_key);
1614   sm_p->key_a[i].buffer_sz = sizeof(status_key);
1615
1616   sm_p->val_a[i].buffer = malloc(sizeof(PVFS_error) * imm_p->dfile_count
1617                                                     * imm_p->copies);
1618   if (!sm_p->val_a[i].buffer)
1619      goto error_exit;
1620   sm_p->val_a[i].buffer_sz = sizeof(PVFS_error) * imm_p->dfile_count
1621                                                 * imm_p->copies;
1622   for (j=0; j<imm_p->dfile_count*imm_p->copies; j++)
1623   {
1624        //memcpy(sm_p->val_a[i].buffer,imm_p->writes_completed
1625        //                       ,sm_p->val_a[i].buffer_sz);
1626        memcpy(&((PVFS_error *)(sm_p->val_a[i].buffer))[j]
1627              ,&(imm_p->writes_completed[j].status)
1628              ,sizeof(PVFS_error));
1629   }
1630   /*verify inputs*/
1631   i=0; assert(i<key_count);
1632   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tVerify inputs BEFORE call to trove...\n");
1633   gossip_debug(GOSSIP_MIRROR_DEBUG,"\t%s\n",(char *)sm_p->key_a[i].buffer);
1634   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsize of buffer : %d\n"
1635                                   ,sm_p->key_a[i].buffer_sz);
1636   PVFS_handle *myHandle = (PVFS_handle *)sm_p->val_a[i].buffer;
1637   for (j=0; j<(imm_p->dfile_count*imm_p->copies); j++)
1638   {
1639       PVFS_handle_unparse(myHandle[j],PVFS_handle_string);
1640       gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\thandle(%d):%s\n"
1641       ,j
1642       ,PVFS_handle_string);
1643   }
1644
1645   i++; assert(i<key_count);
1646   int *myCount = (int *)sm_p->val_a[i].buffer;
1647   sm_p->val_a[i].buffer_sz = sizeof(int);
1648   gossip_debug(GOSSIP_MIRROR_DEBUG,"\t%s:%d \tpointer:%p \tbuffer size:%d\n"
1649                                   ,(char *)sm_p->key_a[i].buffer
1650                                   ,*myCount,myCount
1651                                   ,sm_p->val_a[i].buffer_sz);
1652   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tkey count:%d\n"
1653                                   ,sm_p->keyval_count);
1654
1655   i++; assert(i<key_count);
1656   PVFS_error *myStatus = (PVFS_error *)(sm_p->val_a[i].buffer);
1657   for (j=0; j<(imm_p->dfile_count * imm_p->copies); j++)
1658   {
1659       gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle(%d):status(%d)\n"
1660                                       ,j
1661                                       ,myStatus[j]);
1662   }
1663
1664   /*store keys*/
1665   ret = job_trove_keyval_write_list( imm_p->fs_id
1666                                     ,imm_p->metadata_handle
1667                                     ,sm_p->key_a
1668                                     ,sm_p->val_a
1669                                     ,sm_p->keyval_count
1670                                 ,TROVE_SYNC /*trove flags*/
1671                                     ,NULL
1672                                     ,smcb
1673                                     ,0
1674                                     ,js_p
1675                                     ,&job_id
1676                                     ,server_job_context
1677                                     ,NULL);
1678
1679   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of ret from call to trove : %d\n"
1680                                   ,ret);
1681
1682   i=0; assert(i<key_count);
1683   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tVerify inputs AFTER call to trove...\n");
1684   gossip_debug(GOSSIP_MIRROR_DEBUG,"\t%s\n",(char *)sm_p->key_a[i].buffer);
1685   for (j=0; j<(imm_p->dfile_count * imm_p->copies); j++)
1686   {
1687       PVFS_handle *myHandle = (PVFS_handle *)sm_p->val_a[i].buffer;
1688       memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN);
1689       PVFS_handle_unparse(myHandle[j],PVFS_handle_string);
1690       gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\thandle(%d):%s\n"
1691       ,j
1692       ,PVFS_handle_string);
1693   }
1694
1695   i++; assert(i<key_count);
1696   myCount = (int *)sm_p->val_a[i].buffer;
1697   gossip_debug(GOSSIP_MIRROR_DEBUG,"\t%s:%d \tpointer:%p\n"
1698                                   ,(char *)sm_p->key_a[i].buffer
1699                                   ,*myCount
1700                                   ,sm_p->val_a[i].buffer);
1701
1702
1703   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of ret from trove call : %d\n"
1704                                   ,ret);
1705   return (ret);
1706
1707error_exit:
1708   for (i=0; i<sm_p->keyval_count; i++)
1709   {
1710       if (sm_p->key_a && sm_p->key_a[i].buffer)
1711          free(sm_p->key_a[i].buffer);
1712       if (sm_p->val_a && sm_p->val_a[i].buffer)
1713          free(sm_p->val_a[i].buffer);
1714   }
1715
1716   if (sm_p->key_a)
1717      free(sm_p->key_a);
1718   if (sm_p->val_a)
1719      free(sm_p->val_a);
1720
1721   js_p->error_code = -PVFS_ENOMEM;
1722   return SM_ACTION_COMPLETE;
1723}/*end action store_mirror_info*/
1724
1725
1726
1727static PINT_sm_action check_store_job (struct PINT_smcb *smcb
1728                                      ,job_status_s *js_p)
1729{   
1730   gossip_debug(GOSSIP_MIRROR_DEBUG,
1731                      "Executing check_store_job....\n");
1732   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
1733   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
1734   int i;
1735 
1736
1737   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjs_p->error_code:%d\n"
1738                                   ,js_p->error_code);
1739
1740   if (js_p->error_code)
1741   {
1742      PVFS_handle_unparse(imm_p->metadata_handle,PVFS_handle_string);
1743      gossip_err("Unable to store datahandles and number of copies for this "
1744                 "mirror operation.\n");
1745      gossip_err("\tMeta data handle is %s\n",PVFS_handle_string);
1746      return SM_ACTION_COMPLETE;
1747   }
1748
1749   /*release memory used in previous job call*/
1750   for (i=0; i<sm_p->keyval_count; i++)
1751   {
1752          free(sm_p->key_a[i].buffer);
1753          free(sm_p->val_a[i].buffer);
1754   }
1755   free(sm_p->key_a);
1756   free(sm_p->val_a);
1757   sm_p->key_a = sm_p->val_a = NULL;
1758   sm_p->keyval_count = 0;
1759
1760   js_p->error_code = 0;
1761
1762   return SM_ACTION_COMPLETE;
1763}/*end state check_store_job*/
1764
1765
1766
1767
1768static PINT_sm_action replace_remote_datahandle_objects(struct PINT_smcb *smcb
1769                                                       ,job_status_s *js_p)
1770{   
1771   gossip_debug(GOSSIP_MIRROR_DEBUG,
1772                      "Executing replace_remote_datahandle_objects....\n");
1773
1774   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
1775   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
1776   job_id_t job_id;
1777   int ret; 
1778   int tmpindex;
1779   PVFS_handle pool_handle;
1780
1781   js_p->error_code = 0;
1782
1783   if (!imm_p->handle_array_copies_remote)
1784   {
1785       gossip_debug(GOSSIP_MIRROR_DEBUG,"handle_array_copies_remote is "
1786                                        "null: %p\n"
1787                                        ,imm_p->handle_array_copies_remote);
1788       js_p->error_code = REPLACE_DONE;
1789       return SM_ACTION_COMPLETE;
1790   }
1791
1792   imm_p->handle_array_copies_remote_count--;
1793   if (imm_p->handle_array_copies_remote_count < 0)
1794   {
1795       js_p->error_code = REPLACE_DONE;
1796       return SM_ACTION_COMPLETE;
1797   }
1798
1799   tmpindex = imm_p->handle_array_copies_remote_count;
1800
1801   /* find the pool that this handle belongs to */
1802   ret = job_precreate_pool_lookup_server( imm_p->remote_io_servers[tmpindex],
1803                                           PVFS_TYPE_DATAFILE,
1804                                          imm_p->fs_id
1805                                          ,&pool_handle );
1806   if (ret < 0)
1807   {
1808      imm_p->handle_array_copies_remote_count++;
1809      js_p->error_code = ret;
1810      return SM_ACTION_COMPLETE;
1811   }
1812
1813   ret = job_precreate_pool_fill( pool_handle
1814                                 ,imm_p->fs_id
1815                                 ,&(imm_p->handle_array_copies_remote[tmpindex])
1816                                 ,1
1817                                 ,smcb
1818                                 ,0
1819                                 ,js_p
1820                                 ,&job_id
1821                                 ,server_job_context
1822                                 ,NULL );
1823
1824   return ret;
1825}/*end action replace_remote_datahandle_objects*/
1826
1827
1828
1829static PINT_sm_action remove_local_datahandle_objects(struct PINT_smcb *smcb
1830                                                     ,job_status_s *js_p)
1831{   
1832   gossip_debug(GOSSIP_MIRROR_DEBUG,
1833                             "Executing remove_local_datahandle_objects....\n");
1834 
1835   struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
1836   PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
1837   job_id_t job_id;
1838   int ret;
1839
1840   if (js_p->error_code)
1841       imm_p->saved_error_code = js_p->error_code;
1842
1843   js_p->error_code = 0;
1844
1845   if (!imm_p->handle_array_copies_local)
1846       return SM_ACTION_COMPLETE;
1847
1848
1849   ret = job_trove_dspace_remove_list( imm_p->fs_id
1850                                      ,imm_p->handle_array_copies_local
1851                                      ,NULL
1852                                      ,imm_p->handle_array_copies_local_count
1853                                      ,TROVE_SYNC
1854                                      ,smcb
1855                                      ,0
1856                                      ,js_p
1857                                      ,&job_id
1858                                      ,server_job_context
1859                                      ,NULL );
1860
1861   return ret;
1862}/*end action remove_local_datahandle_objects*/
1863
1864
1865
1866
1867static PINT_sm_action cleanup (struct PINT_smcb *smcb, job_status_s *js_p)
1868{   
1869   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing cleanup....\n");
1870
1871   struct PINT_server_op *sm_p = NULL;
1872   PINT_server_create_copies_op *imm_p = NULL;
1873   int i;
1874
1875   sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
1876   imm_p = &(sm_p->u.create_copies);
1877
1878   if (js_p->error_code == NOTHING_TO_DO)
1879       js_p->error_code = 0;
1880
1881   if (imm_p->my_remote_servers)
1882       free(imm_p->my_remote_servers);
1883
1884   if (imm_p->writes_completed)
1885       free(imm_p->writes_completed);
1886
1887   if (imm_p->handle_array_copies_local)
1888       free(imm_p->handle_array_copies_local);
1889
1890   if (imm_p->handle_array_copies_remote)
1891       free(imm_p->handle_array_copies_remote);
1892
1893   if (imm_p->remote_io_servers)
1894   {
1895       for (i=0;i<imm_p->remote_io_servers_count;i++)
1896            free(imm_p->remote_io_servers[i]);
1897       free(imm_p->remote_io_servers);
1898   }
1899
1900   if (imm_p->local_io_servers)
1901   {
1902      for (i=0; i<imm_p->local_io_servers_count; i++)
1903          free(imm_p->local_io_servers[i]);
1904      free(imm_p->local_io_servers);
1905   }
1906
1907   if (imm_p->handle_array_base)
1908       free(imm_p->handle_array_base);
1909
1910   if (imm_p->handle_array_base_local)
1911       free(imm_p->handle_array_base_local);
1912
1913   if (imm_p->handle_array_copies)
1914       free(imm_p->handle_array_copies);
1915
1916   if (imm_p->io_servers)
1917   {
1918      for (i=0;i<imm_p->io_servers_required;i++)
1919          free(imm_p->io_servers[i]);
1920      free(imm_p->io_servers);
1921   }
1922
1923   if (imm_p->ds_attr_a)
1924      free(imm_p->ds_attr_a);
1925
1926   if (imm_p->bstream_array_base_local)
1927      free(imm_p->bstream_array_base_local);
1928
1929   if (!js_p->error_code && imm_p->saved_error_code)
1930      js_p->error_code = imm_p->saved_error_code;
1931
1932   if (imm_p->dist && imm_p->dist->dist_name)
1933      free(imm_p->dist->dist_name);
1934   if (imm_p->dist)
1935      free(imm_p->dist);
1936
1937   gossip_debug(GOSSIP_MIRROR_DEBUG,"Leaving cleanup: error_code:%d.....\n"
1938                                   ,js_p->error_code);
1939
1940     
1941   return SM_ACTION_COMPLETE;
1942}/*end action cleanup*/
1943
1944
1945int mirror_comp_fn(void *v_p, struct PVFS_server_resp *resp_p, int i)
1946{
1947   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing mirror_comp_fn.....\n");
1948
1949   PINT_smcb *smcb = v_p;
1950   struct PINT_server_op *mirror_op =
1951                                   PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
1952   struct PVFS_servresp_mirror *respmir = &(mirror_op->resp.u.mirror);
1953   int k;
1954
1955   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op:%p\n",mirror_op);
1956
1957   /* only posted one msgpair per source handle*/
1958   assert(i==0);
1959
1960   /* If the response status is non-zero, then the rest of the response is  */
1961   /* NOT encoded in final-response.sm.  So, there are no values to access. */
1962   /* NOTE: An error code will be returned in the status field IFF NONE of  */
1963   /* the writes were successful.  Otherwise, the status of each write will */
1964   /* be contained in the write_status_code field.                          */
1965   if (resp_p->status != 0)
1966      return(resp_p->status);
1967
1968   memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN);
1969   PVFS_handle_unparse(resp_p->u.mirror.src_handle,PVFS_handle_string);
1970   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tresp->src_handle:%s "
1971                                    "\tresp->src_server_nr:%d "
1972                                    "\tresp->status:%d\n"
1973                                   ,PVFS_handle_string
1974                                   ,resp_p->u.mirror.src_server_nr
1975                                   ,resp_p->status );
1976   for (k=0; k<resp_p->u.mirror.dst_count; k++)
1977   {
1978       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tresp->bytes_written[%d]:%d"
1979                                        "\tresp->write_status_code[%d]:%d\n"
1980                                       ,k
1981                                       ,resp_p->u.mirror.bytes_written[k]
1982                                       ,k
1983                                       ,resp_p->u.mirror.write_status_code[k]);
1984   }
1985
1986   assert(mirror_op->op == PVFS_SERV_MIRROR);
1987
1988   memset(&(mirror_op->resp),0,sizeof(mirror_op->resp));
1989
1990   /*capture information from the mirror operation.*/
1991   PVFS_handle_copy(respmir->src_handle, resp_p->u.mirror.src_handle);
1992   respmir->src_server_nr = resp_p->u.mirror.src_server_nr;
1993   respmir->dst_count     = resp_p->u.mirror.dst_count;
1994
1995   respmir->bytes_written = malloc(sizeof(uint32_t) * respmir->dst_count);
1996   if (!respmir->bytes_written)
1997   {
1998      gossip_lerr("Unable to allocate respmir->bytes_written\n");
1999      return (-PVFS_ENOMEM);
2000   }
2001   memset(respmir->bytes_written,0,sizeof(uint32_t) * respmir->dst_count);
2002
2003   respmir->write_status_code = malloc(sizeof(uint32_t) * respmir->dst_count);
2004   if (!respmir->write_status_code)
2005   {
2006       gossip_lerr("Unable to allocate respmir->write_status_code.\n");
2007       return (-PVFS_ENOMEM);
2008   }
2009   memset(respmir->write_status_code,0,sizeof(uint32_t) * respmir->dst_count);
2010
2011   memcpy(respmir->bytes_written,resp_p->u.mirror.bytes_written
2012         ,sizeof(uint32_t) * respmir->dst_count);
2013   memcpy(respmir->write_status_code,resp_p->u.mirror.write_status_code
2014         ,sizeof(uint32_t) * respmir->dst_count);
2015
2016   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsmcb->base_frame:%d\tframe_count:%d\n"
2017                                   ,smcb->base_frame,smcb->frame_count);
2018
2019   return(0);
2020} /*end msgpair completion function mirror_comp_fn*/
2021
2022
2023
2024
2025static PVFS_handle *reorganize_copy_handles(
2026                    struct PINT_server_create_copies_op *imm_p)
2027{
2028   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing reorganize_copy_handles..\n");
2029
2030   uint64_t i,j,k,rows,in_cols,out_cols;
2031   PVFS_handle *copies_out = NULL;
2032   PVFS_handle *copies_in  = imm_p->handle_array_copies;
2033
2034
2035   rows     = imm_p->copies;
2036   in_cols  = imm_p->io_servers_required;
2037   out_cols = imm_p->dfile_count;
2038
2039   /* allocate copies_out array */
2040   copies_out = malloc(sizeof(PVFS_handle) * rows * out_cols);
2041   if (!copies_out)
2042   {
2043      gossip_lerr("Unable to allocate memeory.\n");
2044      return (NULL);
2045   }
2046   memset(copies_out,0,sizeof(PVFS_handle) * rows * out_cols);
2047
2048   for (i=0; i<(in_cols*rows); i++)
2049   {
2050       PVFS_handle_unparse(copies_in[i],PVFS_handle_string);
2051       gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle_array_copies(%d):%s\n"
2052                                       ,(int)i
2053                                       ,PVFS_handle_string);
2054   }
2055
2056   /*this code copies copies_in[n+1] to copies_out[n] within the same row*/
2057   /*each row represents one copy of the logical file, i.e., each of its */
2058   /*datahandles.                                                        */
2059   for (i=0,k=1; i<rows; i++,k++)
2060   {
2061       for (j=0; j<out_cols; j++)
2062       {
2063           PVFS_handle_copy(copies_out[(i*out_cols)+j],
2064                            copies_in[(i*in_cols)+((j+k)%in_cols)]);
2065       }
2066   }
2067   
2068   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tReorg'd Handles Array:\n");
2069   for (i=0; i<rows; i++)
2070   {
2071       for (j=0; j<out_cols; j++)
2072       {
2073           PVFS_handle_unparse(copies_out[(i*out_cols)+j],PVFS_handle_string);
2074           gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\t(%d,%d):%s\n"
2075                                ,(int)i,(int)j
2076                                ,PVFS_handle_string);
2077       }
2078   }
2079
2080   return(copies_out);
2081}/*end function reorganize_copy_handles*/
2082
2083static int get_server_names(PINT_server_create_copies_op *imm_p)
2084{
2085  char **list=NULL;
2086  int size=0
2087     ,i,j,ret;
2088
2089  gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing get_server_names....\n");
2090
2091  gossip_debug(GOSSIP_MIRROR_DEBUG,    "\tio_servers(before):\n");
2092  for (i=0; i<imm_p->io_servers_required; i++)
2093      gossip_debug(GOSSIP_MIRROR_DEBUG,"\t              [%d]:%s"
2094                                       "\tlength:%d\n"
2095                                      ,i
2096                                      ,imm_p->io_servers[i]
2097                                      ,(int)strlen(imm_p->io_servers[i]));
2098
2099
2100
2101  /*Get access to the io server names residing in the cache*/
2102  ret = PINT_cached_config_io_server_names(&list,&size,imm_p->fs_id);
2103  if (ret)
2104  {
2105      if (list)
2106         free(list);
2107      gossip_lerr("Unable to retrieve IO server names from the cache.\n");
2108      return(ret);
2109  }
2110
2111  gossip_debug(GOSSIP_MIRROR_DEBUG,"\tReturned from PINT_cached_config...\n");
2112  for (i=0; i<size; i++)
2113    gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tValue of      size:%d\n"
2114                                     "\t\t\t\tValue of   list[%d]:%p\n"
2115                                     "\t\t\t\tValue of  *list[%d]:%s\n"
2116                                     "\t\t\t\tstrlen of  list[%d]:%d\n\n"
2117                                    ,size
2118                                    ,i,list[i],i,list[i]
2119                                    ,i,(int)strlen(list[i]));
2120
2121  /*Remove server names that are already being used in the io_servers list*/
2122  for (i=0; i<size; i++)
2123  {
2124     for (j=0; j<imm_p->dfile_count; j++)
2125     {
2126        if (strncmp(list[i],imm_p->io_servers[j],strlen(list[i])) == 0)
2127        {
2128         list[i] = NULL;
2129         break;
2130        }
2131     }/*end for*/
2132  }/*end for*/
2133
2134  /*Add server names to io_servers list*/
2135  for (i=0,j=imm_p->dfile_count; i<size && j<imm_p->io_servers_required; i++)
2136  {
2137     if (list[i])
2138     {
2139        strncpy(imm_p->io_servers[j],list[i],SERVER_NAME_MAX-1);
2140        j++;
2141     }
2142  }/*end for*/
2143
2144  gossip_debug(GOSSIP_MIRROR_DEBUG,    "\tio_servers(after):\n");
2145  for (i=0; i<imm_p->io_servers_required; i++)
2146      gossip_debug(GOSSIP_MIRROR_DEBUG,"\t             [%d]:%s\n"
2147                                      ,i
2148                                      ,imm_p->io_servers[i]);
2149
2150  /*deallocate memory used for "list"*/
2151  free(list);
2152
2153  return (0);
2154}/*end function get_server_names*/
2155
2156/******************************************************************************/
2157/* Right now, this state machine is not called as a standalone request. It is */
2158/* only called as a nested machine from seteattr; however, when time comes to */
2159/* create a standalone server request, the values used for the request        */
2160/* parameters are listed below.                                               */
2161/******************************************************************************/
2162static inline int PINT_get_object_ref_copies( struct PVFS_server_req *req
2163                                             ,PVFS_fs_id *fs_id
2164                                             ,PVFS_handle *handle )
2165{
2166   *fs_id  = req->u.seteattr.fs_id;
2167   PVFS_handle_copy(*handle,  req->u.seteattr.handle);     
2168
2169    return 0;
2170};
2171
2172/*request parameters*/
2173struct PINT_server_req_params pvfs2_create_immutable_copies_params =
2174{
2175        .string_name = "create_immutable_copies",
2176        .perm = PINT_SERVER_CHECK_NONE,
2177        .access_type = PINT_server_req_modify,
2178        .sched_policy = PINT_SERVER_REQ_SCHEDULE,
2179        .get_object_ref = PINT_get_object_ref_copies,
2180        .state_machine = &pvfs2_create_immutable_copies_sm
2181};
2182
2183/****** E N D  O F  F I L E *******************************/
Note: See TracBrowser for help on using the browser.