root/branches/Orange-Branch/src/io/bmi/bmi_tcp/bmi-tcp.c @ 9093

Revision 9093, 116.5 KB (checked in by walt, 19 months ago)

initial working versionof usrint code
auto configures usrint on an ucache off (not working yet)
new acl repimplemented - old one has #def
some minor format edits

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/* TCP/IP implementation of a BMI method */
8
9#include <errno.h>
10#include <string.h>
11#include <unistd.h>
12#include <fcntl.h>
13#include <sys/poll.h>
14#include <netinet/tcp.h>
15#include <assert.h>
16#include <sys/uio.h>
17#include <time.h>
18#include <sys/time.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <arpa/inet.h>
22#include "pint-mem.h"
23
24#include "pvfs2-config.h"
25#ifdef HAVE_NETDB_H
26#include <netdb.h>
27#endif
28
29#include "bmi-method-support.h"
30#include "bmi-method-callback.h"
31#include "bmi-tcp-addressing.h"
32#ifdef __PVFS2_USE_EPOLL__
33#include "socket-collection-epoll.h"
34#else
35#include "socket-collection.h"
36#endif
37#include "op-list.h"
38#include "gossip.h"
39#include "sockio.h"
40#include "bmi-byteswap.h"
41#include "id-generator.h"
42#include "pint-event.h"
43#include "pvfs2-debug.h"
44#ifdef USE_TRUSTED
45#include "server-config.h"
46#include "bmi-tcp-addressing.h"
47#endif
48#include "gen-locks.h"
49#include "pint-hint.h"
50#include "pint-event.h"
51
52static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
53static gen_cond_t interface_cond = GEN_COND_INITIALIZER;
54static int sc_test_busy = 0;
55
56/* function prototypes */
57int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
58                       int method_id,
59                       int init_flags);
60int BMI_tcp_finalize(void);
61int BMI_tcp_set_info(int option,
62                     void *inout_parameter);
63int BMI_tcp_get_info(int option,
64                     void *inout_parameter);
65void *BMI_tcp_memalloc(bmi_size_t size,
66                       enum bmi_op_type send_recv);
67int BMI_tcp_memfree(void *buffer,
68                    bmi_size_t size,
69                    enum bmi_op_type send_recv);
70int BMI_tcp_unexpected_free(void *buffer);
71int BMI_tcp_post_send(bmi_op_id_t * id,
72                      bmi_method_addr_p dest,
73                      const void *buffer,
74                      bmi_size_t size,
75                      enum bmi_buffer_type buffer_type,
76                      bmi_msg_tag_t tag,
77                      void *user_ptr,
78                      bmi_context_id context_id,
79                      PVFS_hint hints);
80int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
81                                bmi_method_addr_p dest,
82                                const void *buffer,
83                                bmi_size_t size,
84                                enum bmi_buffer_type buffer_type,
85                                bmi_msg_tag_t tag,
86                                void *user_ptr,
87                                bmi_context_id context_id,
88                                PVFS_hint hints);
89int BMI_tcp_post_recv(bmi_op_id_t * id,
90                      bmi_method_addr_p src,
91                      void *buffer,
92                      bmi_size_t expected_size,
93                      bmi_size_t * actual_size,
94                      enum bmi_buffer_type buffer_type,
95                      bmi_msg_tag_t tag,
96                      void *user_ptr,
97                      bmi_context_id context_id,
98                      PVFS_hint hints);
99int BMI_tcp_test(bmi_op_id_t id,
100                 int *outcount,
101                 bmi_error_code_t * error_code,
102                 bmi_size_t * actual_size,
103                 void **user_ptr,
104                 int max_idle_time_ms,
105                 bmi_context_id context_id);
106int BMI_tcp_testsome(int incount,
107                     bmi_op_id_t * id_array,
108                     int *outcount,
109                     int *index_array,
110                     bmi_error_code_t * error_code_array,
111                     bmi_size_t * actual_size_array,
112                     void **user_ptr_array,
113                     int max_idle_time_ms,
114                     bmi_context_id context_id);
115int BMI_tcp_testunexpected(int incount,
116                           int *outcount,
117                           struct bmi_method_unexpected_info *info,
118                           int max_idle_time_ms);
119int BMI_tcp_testcontext(int incount,
120                     bmi_op_id_t * out_id_array,
121                     int *outcount,
122                     bmi_error_code_t * error_code_array,
123                     bmi_size_t * actual_size_array,
124                     void **user_ptr_array,
125                     int max_idle_time_ms,
126                     bmi_context_id context_id);
127bmi_method_addr_p BMI_tcp_method_addr_lookup(const char *id_string);
128const char* BMI_tcp_addr_rev_lookup_unexpected(bmi_method_addr_p map);
129int BMI_tcp_query_addr_range(bmi_method_addr_p, const char *, int);
130int BMI_tcp_post_send_list(bmi_op_id_t * id,
131                           bmi_method_addr_p dest,
132                           const void *const *buffer_list,
133                           const bmi_size_t *size_list,
134                           int list_count,
135                           bmi_size_t total_size,
136                           enum bmi_buffer_type buffer_type,
137                           bmi_msg_tag_t tag,
138                           void *user_ptr,
139                           bmi_context_id context_id,
140                           PVFS_hint hints);
141int BMI_tcp_post_recv_list(bmi_op_id_t * id,
142                           bmi_method_addr_p src,
143                           void *const *buffer_list,
144                           const bmi_size_t *size_list,
145                           int list_count,
146                           bmi_size_t total_expected_size,
147                           bmi_size_t * total_actual_size,
148                           enum bmi_buffer_type buffer_type,
149                           bmi_msg_tag_t tag,
150                           void *user_ptr,
151                           bmi_context_id context_id,
152                           PVFS_hint hints);
153int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
154                                     bmi_method_addr_p dest,
155                                     const void *const *buffer_list,
156                                     const bmi_size_t *size_list,
157                                     int list_count,
158                                     bmi_size_t total_size,
159                                     enum bmi_buffer_type buffer_type,
160                                     bmi_msg_tag_t tag,
161                                     void *user_ptr,
162                                     bmi_context_id context_id,
163                                     PVFS_hint hints);
164int BMI_tcp_open_context(bmi_context_id context_id);
165void BMI_tcp_close_context(bmi_context_id context_id);
166int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id);
167
168char BMI_tcp_method_name[] = "bmi_tcp";
169
170/* size of encoded message header */
171#define TCP_ENC_HDR_SIZE 24
172
173/* structure internal to tcp for use as a message header */
174struct tcp_msg_header
175{
176    uint32_t magic_nr;          /* magic number */
177    uint32_t mode;              /* eager, rendezvous, etc. */
178    bmi_msg_tag_t tag;          /* user specified message tag */
179    bmi_size_t size;            /* length of trailing message */
180    char enc_hdr[TCP_ENC_HDR_SIZE];  /* encoded version of header info */
181};
182
183#define BMI_TCP_ENC_HDR(hdr)                                            \
184    do {                                                                \
185        *((uint32_t*)&((hdr).enc_hdr[0])) = htobmi32((hdr).magic_nr);   \
186        *((uint32_t*)&((hdr).enc_hdr[4])) = htobmi32((hdr).mode);       \
187        *((uint64_t*)&((hdr).enc_hdr[8])) = htobmi64((hdr).tag);        \
188        *((uint64_t*)&((hdr).enc_hdr[16])) = htobmi64((hdr).size);      \
189    } while(0)                                             
190
191#define BMI_TCP_DEC_HDR(hdr)                                            \
192    do {                                                                \
193        (hdr).magic_nr = bmitoh32(*((uint32_t*)&((hdr).enc_hdr[0])));   \
194        (hdr).mode = bmitoh32(*((uint32_t*)&((hdr).enc_hdr[4])));       \
195        (hdr).tag = bmitoh64(*((uint64_t*)&((hdr).enc_hdr[8])));        \
196        (hdr).size = bmitoh64(*((uint64_t*)&((hdr).enc_hdr[16])));      \
197    } while(0)                                             
198
199/* enumerate states that we care about */
200enum bmi_tcp_state
201{
202    BMI_TCP_INPROGRESS,
203    BMI_TCP_BUFFERING,
204    BMI_TCP_COMPLETE
205};
206
207/* tcp private portion of operation structure */
208struct tcp_op
209{
210    struct tcp_msg_header env;  /* envelope for this message */
211    enum bmi_tcp_state tcp_op_state;
212    /* these two fields are used as place holders for the buffer
213     * list and size list when we really don't have lists (regular
214     * BMI_send or BMI_recv operations); it allows us to use
215     * generic code to handle both cases
216     */
217    void *buffer_list_stub;
218    bmi_size_t size_list_stub;
219};
220
221/* static io vector for use with readv and writev; we can only use
222 * this because BMI serializes module calls
223 */
224#define BMI_TCP_IOV_COUNT 10
225static struct iovec stat_io_vector[BMI_TCP_IOV_COUNT+1];
226
227/* internal utility functions */
228static int tcp_server_init(void);
229static void dealloc_tcp_method_addr(bmi_method_addr_p map);
230static int tcp_sock_init(bmi_method_addr_p my_method_addr);
231static int enqueue_operation(op_list_p target_list,
232                             enum bmi_op_type send_recv,
233                             bmi_method_addr_p map,
234                             void *const *buffer_list,
235                             const bmi_size_t *size_list,
236                             int list_count,
237                             bmi_size_t amt_complete,
238                             bmi_size_t env_amt_complete,
239                             bmi_op_id_t * id,
240                             int tcp_op_state,
241                             struct tcp_msg_header header,
242                             void *user_ptr,
243                             bmi_size_t actual_size,
244                             bmi_size_t expected_size,
245                             bmi_context_id context_id,
246                             int32_t event_id);
247static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code);
248static int tcp_shutdown_addr(bmi_method_addr_p map);
249static int tcp_do_work(int max_idle_time);
250static int tcp_do_work_error(bmi_method_addr_p map);
251static int tcp_do_work_recv(bmi_method_addr_p map, int* stall_flag);
252static int tcp_do_work_send(bmi_method_addr_p map, int* stall_flag);
253static int work_on_recv_op(method_op_p my_method_op,
254                           int *stall_flag);
255static int work_on_send_op(method_op_p my_method_op,
256                           int *blocked_flag, int* stall_flag);
257static int tcp_accept_init(int *socket, char** peer);
258static method_op_p alloc_tcp_method_op(void);
259static void dealloc_tcp_method_op(method_op_p old_op);
260static int handle_new_connection(bmi_method_addr_p map);
261static int tcp_post_send_generic(bmi_op_id_t * id,
262                                 bmi_method_addr_p dest,
263                                 const void *const *buffer_list,
264                                 const bmi_size_t *size_list,
265                                 int list_count,
266                                 enum bmi_buffer_type buffer_type,
267                                 struct tcp_msg_header my_header,
268                                 void *user_ptr,
269                                 bmi_context_id context_id,
270                                 PVFS_hint hints);
271static int tcp_post_recv_generic(bmi_op_id_t * id,
272                                 bmi_method_addr_p src,
273                                 void *const *buffer_list,
274                                 const bmi_size_t *size_list,
275                                 int list_count,
276                                 bmi_size_t expected_size,
277                                 bmi_size_t * actual_size,
278                                 enum bmi_buffer_type buffer_type,
279                                 bmi_msg_tag_t tag,
280                                 void *user_ptr,
281                                 bmi_context_id context_id,
282                                 PVFS_hint hints);
283static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
284    size_list, int list_count, bmi_size_t total_size, int* list_index,
285    bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
286    char* enc_hdr, bmi_size_t* env_amt_complete);
287
288#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
289static int tcp_enable_trusted(struct tcp_addr *tcp_addr_data);
290#endif
291#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
292static int tcp_allow_trusted(struct sockaddr_in *peer_sockaddr);
293#endif
294
295static void bmi_set_sock_buffers(int socket);
296
297/* exported method interface */
298const struct bmi_method_ops bmi_tcp_ops = {
299    .method_name = BMI_tcp_method_name,
300    .initialize = BMI_tcp_initialize,
301    .finalize = BMI_tcp_finalize,
302    .set_info = BMI_tcp_set_info,
303    .get_info = BMI_tcp_get_info,
304    .memalloc = BMI_tcp_memalloc,
305    .memfree  = BMI_tcp_memfree,
306    .unexpected_free = BMI_tcp_unexpected_free,
307    .post_send = BMI_tcp_post_send,
308    .post_sendunexpected = BMI_tcp_post_sendunexpected,
309    .post_recv = BMI_tcp_post_recv,
310    .test = BMI_tcp_test,
311    .testsome = BMI_tcp_testsome,
312    .testcontext = BMI_tcp_testcontext,
313    .testunexpected = BMI_tcp_testunexpected,
314    .method_addr_lookup = BMI_tcp_method_addr_lookup,
315    .post_send_list = BMI_tcp_post_send_list,
316    .post_recv_list = BMI_tcp_post_recv_list,
317    .post_sendunexpected_list = BMI_tcp_post_sendunexpected_list,
318    .open_context = BMI_tcp_open_context,
319    .close_context = BMI_tcp_close_context,
320    .cancel = BMI_tcp_cancel,
321    .rev_lookup_unexpected = BMI_tcp_addr_rev_lookup_unexpected,
322    .query_addr_range = BMI_tcp_query_addr_range,
323};
324
325/* module parameters */
326static struct
327{
328    int method_flags;
329    int method_id;
330    bmi_method_addr_p listen_addr;
331} tcp_method_params;
332
333#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
334static struct tcp_allowed_connection_s *gtcp_allowed_connection = NULL;
335#endif
336
337static int check_unexpected = 1;
338
339/* op_list_array indices */
340enum
341{
342    NUM_INDICES = 5,
343    IND_SEND = 0,
344    IND_RECV = 1,
345    IND_RECV_INFLIGHT = 2,
346    IND_RECV_EAGER_DONE_BUFFERING = 3,
347    IND_COMPLETE_RECV_UNEXP = 4,        /* MAKE THIS COMES LAST */
348};
349
350/* internal operation lists */
351static op_list_p op_list_array[6] = { NULL, NULL, NULL, NULL,
352    NULL, NULL
353};
354
355/* internal completion queues */
356static op_list_p completion_array[BMI_MAX_CONTEXTS] = { NULL };
357
358/* internal socket collection */
359static socket_collection_p tcp_socket_collection_p = NULL;
360
361/* tunable parameters */
362enum
363{
364    /* amount of pending connections we'll allow */
365    TCP_BACKLOG = 256,
366    /* amount of work to be done during a test.  This roughly
367     * translates into the number of sockets that we will perform
368     * nonblocking operations on during one function call.
369     */
370    TCP_WORK_METRIC = 128
371};
372
373/* TCP message modes */
374enum
375{
376    TCP_MODE_IMMED = 1,         /* not used for TCP/IP */
377    TCP_MODE_UNEXP = 2,
378    TCP_MODE_EAGER = 4,
379    TCP_MODE_REND = 8
380};
381
382/* Allowable sizes for each mode */
383enum
384{
385    TCP_MODE_EAGER_LIMIT = 16384,       /* 16K */
386    TCP_MODE_REND_LIMIT = 16777216      /* 16M */
387};
388
389/* toggles cancel mode; for bmi_tcp this will result in socket being closed
390 * in all cancellation cases
391 */
392static int forceful_cancel_mode = 0;
393
394/*
395  Socket buffer sizes, currently these default values will be used
396  for the clients... (TODO)
397 */
398static int tcp_buffer_size_receive = 0;
399static int tcp_buffer_size_send = 0;
400
401static PINT_event_type bmi_tcp_send_event_id;
402static PINT_event_type bmi_tcp_recv_event_id;
403
404static PINT_event_group bmi_tcp_event_group;
405static pid_t bmi_tcp_pid;
406
407/*************************************************************************
408 * Visible Interface
409 */
410
411/* BMI_tcp_initialize()
412 *
413 * Initializes the tcp method.  Must be called before any other tcp
414 * method functions.
415 *
416 * returns 0 on success, -errno on failure
417 */
418int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
419                       int method_id,
420                       int init_flags)
421{
422
423    int ret = -1;
424    int tmp_errno = bmi_tcp_errno_to_pvfs(-ENOSYS);
425    struct tcp_addr *tcp_addr_data = NULL;
426    int i = 0;
427
428    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Initializing TCP/IP module.\n");
429
430    /* check args */
431    if ((init_flags & BMI_INIT_SERVER) && !listen_addr)
432    {
433        gossip_lerr("Error: bad parameters given to TCP/IP module.\n");
434        return (bmi_tcp_errno_to_pvfs(-EINVAL));
435    }
436
437    gen_mutex_lock(&interface_mutex);
438
439    /* zero out our parameter structure and fill it in */
440    memset(&tcp_method_params, 0, sizeof(tcp_method_params));
441    tcp_method_params.method_id = method_id;
442    tcp_method_params.method_flags = init_flags;
443
444    if (init_flags & BMI_INIT_SERVER)
445    {
446        /* hang on to our local listening address if needed */
447        tcp_method_params.listen_addr = listen_addr;
448        /* and initialize server functions */
449        ret = tcp_server_init();
450        if (ret < 0)
451        {
452            tmp_errno = bmi_tcp_errno_to_pvfs(ret);
453            gossip_err("Error: tcp_server_init() failure.\n");
454            goto initialize_failure;
455        }
456    }
457
458    /* set up the operation lists */
459    for (i = 0; i < NUM_INDICES; i++)
460    {
461        op_list_array[i] = op_list_new();
462        if (!op_list_array[i])
463        {
464            tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
465            goto initialize_failure;
466        }
467    }
468
469    /* set up the socket collection */
470    if (tcp_method_params.method_flags & BMI_INIT_SERVER)
471    {
472        tcp_addr_data = tcp_method_params.listen_addr->method_data;
473        tcp_socket_collection_p = BMI_socket_collection_init(tcp_addr_data->socket);
474    }
475    else
476    {
477        tcp_socket_collection_p = BMI_socket_collection_init(-1);
478    }
479
480    if (!tcp_socket_collection_p)
481    {
482        tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
483        goto initialize_failure;
484    }
485
486    bmi_tcp_pid = getpid();
487    PINT_event_define_group("bmi_tcp", &bmi_tcp_event_group);
488
489    /* Define the send event:
490     *   START: (client_id, request_id, rank, handle, op_id, send_size)
491     *   STOP: (size_sent)
492     */
493    PINT_event_define_event(
494        &bmi_tcp_event_group,
495#ifdef __PVFS2_SERVER__
496        "bmi_server_send",
497#else
498        "bmi_client_send",
499#endif
500        "%d%d%d%llu%d%d",
501        "%d", &bmi_tcp_send_event_id);
502
503    /* Define the recv event:
504     *   START: (client_id, request_id, rank, handle, op_id, recv_size)
505     *   STOP: (size_received)
506     */
507    PINT_event_define_event(
508        &bmi_tcp_event_group,
509#ifdef __PVFS2_SERVER__
510        "bmi_server_recv",
511#else
512        "bmi_client_recv",
513#endif
514        "%d%d%d%llu%d%d",
515        "%d", &bmi_tcp_recv_event_id);
516
517    gen_mutex_unlock(&interface_mutex);
518    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
519                  "TCP/IP module successfully initialized.\n");
520    return (0);
521
522  initialize_failure:
523
524    /* cleanup data structures and bail out */
525    for (i = 0; i < NUM_INDICES; i++)
526    {
527        if (op_list_array[i])
528        {
529            op_list_cleanup(op_list_array[i]);
530        }
531    }
532    if (tcp_socket_collection_p)
533    {
534        BMI_socket_collection_finalize(tcp_socket_collection_p);
535    }
536    gen_mutex_unlock(&interface_mutex);
537    return (tmp_errno);
538}
539
540
541/* BMI_tcp_finalize()
542 *
543 * Shuts down the tcp method.
544 *
545 * returns 0 on success, -errno on failure
546 */
547int BMI_tcp_finalize(void)
548{
549    int i = 0;
550
551    gen_mutex_lock(&interface_mutex);
552
553    /* shut down our listen addr, if we have one */
554    if ((tcp_method_params.method_flags & BMI_INIT_SERVER)
555        && tcp_method_params.listen_addr)
556    {
557        dealloc_tcp_method_addr(tcp_method_params.listen_addr);
558    }
559
560    /* note that this forcefully shuts down operations */
561    for (i = 0; i < NUM_INDICES; i++)
562    {
563        if (op_list_array[i])
564        {
565            op_list_cleanup(op_list_array[i]);
566            op_list_array[i] = NULL;
567        }
568    }
569
570    /* get rid of socket collection */
571    if (tcp_socket_collection_p)
572    {
573        BMI_socket_collection_finalize(tcp_socket_collection_p);
574        tcp_socket_collection_p = NULL;
575    }
576
577    /* NOTE: we are trusting the calling BMI layer to deallocate
578     * all of the method addresses (this will close any open sockets)
579     */
580    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "TCP/IP module finalized.\n");
581    gen_mutex_unlock(&interface_mutex);
582    return (0);
583}
584
585
586/*
587 * BMI_tcp_method_addr_lookup()
588 *
589 * resolves the string representation of an address into a method
590 * address structure. 
591 *
592 * returns a pointer to method_addr on success, NULL on failure
593 */
594bmi_method_addr_p BMI_tcp_method_addr_lookup(const char *id_string)
595{
596    char *tcp_string = NULL;
597    char *delim = NULL;
598    char *hostname = NULL;
599    bmi_method_addr_p new_addr = NULL;
600    struct tcp_addr *tcp_addr_data = NULL;
601    int ret = -1;
602
603    tcp_string = string_key("tcp", id_string);
604    if (!tcp_string)
605    {
606        /* the string doesn't even have our info */
607        return (NULL);
608    }
609
610    /* start breaking up the method information */
611    /* for normal tcp, it is simply hostname:port */
612    if ((delim = index(tcp_string, ':')) == NULL)
613    {
614        gossip_lerr("Error: malformed tcp address.\n");
615        free(tcp_string);
616        return (NULL);
617    }
618
619    /* looks ok, so let's build the method addr structure */
620    new_addr = alloc_tcp_method_addr();
621    if (!new_addr)
622    {
623        free(tcp_string);
624        return (NULL);
625    }
626    tcp_addr_data = new_addr->method_data;
627
628    ret = sscanf((delim + 1), "%d", &(tcp_addr_data->port));
629    if (ret != 1)
630    {
631        gossip_lerr("Error: malformed tcp address.\n");
632        dealloc_tcp_method_addr(new_addr);
633        free(tcp_string);
634        return (NULL);
635    }
636
637    hostname = (char *) malloc((delim - tcp_string + 1));
638    if (!hostname)
639    {
640        dealloc_tcp_method_addr(new_addr);
641        free(tcp_string);
642        return (NULL);
643    }
644    strncpy(hostname, tcp_string, (delim - tcp_string));
645    hostname[delim - tcp_string] = '\0';
646
647    tcp_addr_data->hostname = hostname;
648
649    free(tcp_string);
650    return (new_addr);
651}
652
653
654/* BMI_tcp_memalloc()
655 *
656 * Allocates memory that can be used in native mode by tcp.
657 *
658 * returns 0 on success, -errno on failure
659 */
660void *BMI_tcp_memalloc(bmi_size_t size,
661                       enum bmi_op_type send_recv)
662{
663    /* we really don't care what flags the caller uses, TCP/IP has no
664     * preferences about how the memory should be configured.
665     */
666
667/*    return (calloc(1,(size_t) size)); */
668    return PINT_mem_aligned_alloc(size, 4096);
669}
670
671
672/* BMI_tcp_memfree()
673 *
674 * Frees memory that was allocated with BMI_tcp_memalloc()
675 *
676 * returns 0 on success, -errno on failure
677 */
678int BMI_tcp_memfree(void *buffer,
679                    bmi_size_t size,
680                    enum bmi_op_type send_recv)
681{
682    PINT_mem_aligned_free(buffer);
683    return (0);
684}
685
686/* BMI_tcp_unexpected_free()
687 *
688 * Frees memory that was returned from BMI_tcp_test_unexpected()
689 *
690 * returns 0 on success, -errno on failure
691 */
692int BMI_tcp_unexpected_free(void *buffer)
693{
694    if (buffer)
695    {
696        free(buffer);
697    }
698    return (0);
699}
700
701#ifdef USE_TRUSTED
702
703static struct tcp_allowed_connection_s *
704alloc_trusted_connection_info(int network_count)
705{
706    struct tcp_allowed_connection_s *tcp_allowed_connection_info = NULL;
707
708    tcp_allowed_connection_info = (struct tcp_allowed_connection_s *)
709            calloc(1, sizeof(struct tcp_allowed_connection_s));
710    if (tcp_allowed_connection_info)
711    {
712        tcp_allowed_connection_info->network =
713            (struct in_addr *) calloc(network_count, sizeof(struct in_addr));
714        if (tcp_allowed_connection_info->network == NULL)
715        {
716            free(tcp_allowed_connection_info);
717            tcp_allowed_connection_info = NULL;
718        }
719        else
720        {
721            tcp_allowed_connection_info->netmask =
722                (struct in_addr *) calloc(network_count, sizeof(struct in_addr));
723            if (tcp_allowed_connection_info->netmask == NULL)
724            {
725                free(tcp_allowed_connection_info->network);
726                free(tcp_allowed_connection_info);
727                tcp_allowed_connection_info = NULL;
728            }
729            else {
730                tcp_allowed_connection_info->network_count = network_count;
731            }
732        }
733    }
734    return tcp_allowed_connection_info;
735}
736
737static void
738dealloc_trusted_connection_info(void* ptcp_allowed_connection_info)
739{
740    struct tcp_allowed_connection_s *tcp_allowed_connection_info =
741        (struct tcp_allowed_connection_s *) ptcp_allowed_connection_info;
742    if (tcp_allowed_connection_info)
743    {
744        free(tcp_allowed_connection_info->network);
745        tcp_allowed_connection_info->network = NULL;
746        free(tcp_allowed_connection_info->netmask);
747        tcp_allowed_connection_info->netmask = NULL;
748        free(tcp_allowed_connection_info);
749    }
750    return;
751}
752
753#endif
754
755/*
756 * This function will convert a mask_bits value to an in_addr
757 * representation. i.e for example if
758 * mask_bits was 24 then it would be 255.255.255.0
759 * if mask_bits was 22 then it would be 255.255.252.0
760 * etc
761 */
762static void convert_mask(int mask_bits, struct in_addr *mask)
763{
764   uint32_t addr = -1;
765   addr = addr & ~~(-1 << (mask_bits ? (32 - mask_bits) : 32));
766   mask->s_addr = htonl(addr);
767   return;
768}
769
770/* BMI_tcp_set_info()
771 *
772 * Pass in optional parameters.
773 *
774 * returns 0 on success, -errno on failure
775 */
776int BMI_tcp_set_info(int option,
777                     void *inout_parameter)
778{
779    int ret = -1;
780    bmi_method_addr_p tmp_addr = NULL;
781
782    gen_mutex_lock(&interface_mutex);
783
784    switch (option)
785    {
786    case BMI_TCP_BUFFER_SEND_SIZE:
787       tcp_buffer_size_send = *((int *)inout_parameter);
788       ret = 0;
789#ifdef __PVFS2_SERVER__
790       /* Set the default socket buffer sizes for the server socket */
791       bmi_set_sock_buffers(
792           ((struct tcp_addr *)
793            tcp_method_params.listen_addr->method_data)->socket);
794#endif
795       break;
796    case BMI_TCP_BUFFER_RECEIVE_SIZE:
797       tcp_buffer_size_receive = *((int *)inout_parameter);
798       ret = 0;
799#ifdef __PVFS2_SERVER__
800       /* Set the default socket buffer sizes for the server socket */
801       bmi_set_sock_buffers(
802           ((struct tcp_addr *)
803            tcp_method_params.listen_addr->method_data)->socket);
804#endif
805       break;
806    case BMI_TCP_CLOSE_SOCKET:
807        /* this should no longer make it to the bmi_tcp method; see bmi.c */
808        ret = 0;
809        break;
810    case BMI_FORCEFUL_CANCEL_MODE:
811        forceful_cancel_mode = 1;
812        ret = 0;
813        break;
814    case BMI_DROP_ADDR:
815        if (inout_parameter == NULL)
816        {
817            ret = bmi_tcp_errno_to_pvfs(-EINVAL);
818        }
819        else
820        {
821            tmp_addr = (bmi_method_addr_p) inout_parameter;
822            /* take it out of the socket collection */
823            tcp_forget_addr(tmp_addr, 1, 0);
824            ret = 0;
825        }
826        break;
827#ifdef USE_TRUSTED
828    case BMI_TRUSTED_CONNECTION:
829    {
830        struct tcp_allowed_connection_s *tcp_allowed_connection = NULL;
831        if (inout_parameter == NULL)
832        {
833            ret = bmi_tcp_errno_to_pvfs(-EINVAL);
834            break;
835        }
836        else
837        {
838            int    bmi_networks_count = 0;
839            char **bmi_networks = NULL;
840            int   *bmi_netmasks = NULL;
841            struct server_configuration_s *svc_config = NULL;
842
843            svc_config = (struct server_configuration_s *) inout_parameter;
844            tcp_allowed_connection = alloc_trusted_connection_info(svc_config->allowed_networks_count);
845            if (tcp_allowed_connection == NULL)
846            {
847                ret = bmi_tcp_errno_to_pvfs(-ENOMEM);
848                break;
849            }
850#ifdef      __PVFS2_SERVER__
851            gtcp_allowed_connection = tcp_allowed_connection;
852#endif
853            /* Stash this in the server_configuration_s structure. freed later on */
854            svc_config->security = tcp_allowed_connection;
855            svc_config->security_dtor = &dealloc_trusted_connection_info;
856            ret = 0;
857            /* Fill up the list of allowed ports */
858            PINT_config_get_allowed_ports(svc_config,
859                    &tcp_allowed_connection->port_enforce,
860                    tcp_allowed_connection->ports);
861
862            /* if it was enabled, make sure that we know how to deal with it */
863            if (tcp_allowed_connection->port_enforce == 1)
864            {
865                /* illegal ports */
866                if (tcp_allowed_connection->ports[0] > 65535
867                        || tcp_allowed_connection->ports[1] > 65535
868                        || tcp_allowed_connection->ports[1] < tcp_allowed_connection->ports[0])
869                {
870                    gossip_lerr("Error: illegal trusted port values\n");
871                    ret = bmi_tcp_errno_to_pvfs(-EINVAL);
872                    /* don't enforce anything! */
873                    tcp_allowed_connection->port_enforce = 0;
874                }
875            }
876            ret = 0;
877            /* Retrieve the list of BMI network addresses and masks  */
878            PINT_config_get_allowed_networks(svc_config,
879                    &tcp_allowed_connection->network_enforce,
880                    &bmi_networks_count,
881                    &bmi_networks,
882                    &bmi_netmasks);
883
884            /* if it was enabled, make sure that we know how to deal with it */
885            if (tcp_allowed_connection->network_enforce == 1)
886            {
887                int i;
888
889                for (i = 0; i < bmi_networks_count; i++)
890                {
891                    char *tcp_string = NULL;
892                    /* Convert the network string into an in_addr_t structure */
893                    tcp_string = string_key("tcp", bmi_networks[i]);
894                    if (!tcp_string)
895                    {
896                        /* the string doesn't even have our info */
897                        gossip_lerr("Error: malformed tcp network address\n");
898                        ret = bmi_tcp_errno_to_pvfs(-EINVAL);
899                    }
900                    else {
901                        /* convert this into an in_addr_t */
902                        inet_aton(tcp_string, &tcp_allowed_connection->network[i]);
903                        free(tcp_string);
904                    }
905                    convert_mask(bmi_netmasks[i], &tcp_allowed_connection->netmask[i]);
906                }
907                /* don't enforce anything if there were any errors */
908                if (ret != 0)
909                {
910                    tcp_allowed_connection->network_enforce = 0;
911                }
912            }
913        }
914        break;
915    }
916#endif
917    case BMI_TCP_CHECK_UNEXPECTED:
918    {
919        check_unexpected = *(int *)inout_parameter;
920        ret = 0;
921        break;
922    }
923
924    default:
925        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
926                      "TCP hint %d not implemented.\n", option);
927        ret = 0;
928        break;
929    }
930
931    gen_mutex_unlock(&interface_mutex);
932    return (ret);
933}
934
935/* BMI_tcp_get_info()
936 *
937 * Query for optional parameters.
938 *
939 * returns 0 on success, -errno on failure
940 */
941int BMI_tcp_get_info(int option,
942                     void *inout_parameter)
943{
944    struct method_drop_addr_query* query;
945    struct tcp_addr* tcp_addr_data;
946    int ret = 0;
947
948    gen_mutex_lock(&interface_mutex);
949
950    switch (option)
951    {
952    case BMI_CHECK_MAXSIZE:
953        *((int *) inout_parameter) = TCP_MODE_REND_LIMIT;
954        ret = 0;
955        break;
956    case BMI_DROP_ADDR_QUERY:
957        query = (struct method_drop_addr_query*)inout_parameter;
958        tcp_addr_data=query->addr->method_data;
959        /* only suggest that we discard the address if we have experienced
960         * an error and there is no way to reconnect
961         */
962        if(tcp_addr_data->addr_error != 0 &&
963           tcp_addr_data->dont_reconnect == 1)
964        {
965            query->response = 1;
966        }
967        else
968        {
969            query->response = 0;
970        }
971        ret = 0;
972        break;
973    case BMI_GET_UNEXP_SIZE:
974        *((int *) inout_parameter) = TCP_MODE_EAGER_LIMIT;
975        ret = 0;
976        break;
977
978    default:
979        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
980                      "TCP hint %d not implemented.\n", option);
981        ret = -ENOSYS;
982        break;
983    }
984
985    gen_mutex_unlock(&interface_mutex);
986    return (ret < 0) ? bmi_tcp_errno_to_pvfs(ret) : ret;
987}
988
989
990/* BMI_tcp_post_send()
991 *
992 * Submits send operations.
993 *
994 * returns 0 on success that requires later poll, returns 1 on instant
995 * completion, -errno on failure
996 */
997int BMI_tcp_post_send(bmi_op_id_t * id,
998                      bmi_method_addr_p dest,
999                      const void *buffer,
1000                      bmi_size_t size,
1001                      enum bmi_buffer_type buffer_type,
1002                      bmi_msg_tag_t tag,
1003                      void *user_ptr,
1004                      bmi_context_id context_id,
1005                      PVFS_hint hints)
1006{
1007    struct tcp_msg_header my_header;
1008    int ret = -1;
1009
1010    /* clear the id field for safety */
1011    *id = 0;
1012
1013    /* fill in the TCP-specific message header */
1014    if (size > TCP_MODE_REND_LIMIT)
1015    {
1016        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1017    }
1018
1019    if (size <= TCP_MODE_EAGER_LIMIT)
1020    {
1021        my_header.mode = TCP_MODE_EAGER;
1022    }
1023    else
1024    {
1025        my_header.mode = TCP_MODE_REND;
1026    }
1027    my_header.tag = tag;
1028    my_header.size = size;
1029    my_header.magic_nr = BMI_MAGIC_NR;
1030
1031    gen_mutex_lock(&interface_mutex);
1032
1033    ret = tcp_post_send_generic(id, dest, &buffer,
1034                                &size, 1, buffer_type, my_header,
1035                                user_ptr, context_id, hints);
1036
1037    gen_mutex_unlock(&interface_mutex);
1038    return(ret);
1039}
1040
1041
1042/* BMI_tcp_post_sendunexpected()
1043 *
1044 * Submits unexpected send operations.
1045 *
1046 * returns 0 on success that requires later poll, returns 1 on instant
1047 * completion, -errno on failure
1048 */
1049int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
1050                                bmi_method_addr_p dest,
1051                                const void *buffer,
1052                                bmi_size_t size,
1053                                enum bmi_buffer_type buffer_type,
1054                                bmi_msg_tag_t tag,
1055                                void *user_ptr,
1056                                bmi_context_id context_id,
1057                                PVFS_hint hints)
1058{
1059    struct tcp_msg_header my_header;
1060    int ret = -1;
1061
1062    /* clear the id field for safety */
1063    *id = 0;
1064
1065    if (size > TCP_MODE_EAGER_LIMIT)
1066    {
1067        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1068    }
1069
1070    my_header.mode = TCP_MODE_UNEXP;
1071    my_header.tag = tag;
1072    my_header.size = size;
1073    my_header.magic_nr = BMI_MAGIC_NR;
1074
1075    gen_mutex_lock(&interface_mutex);
1076
1077    ret = tcp_post_send_generic(id, dest, &buffer,
1078                                &size, 1, buffer_type, my_header,
1079                                user_ptr, context_id, hints);
1080    gen_mutex_unlock(&interface_mutex);
1081    return(ret);
1082}
1083
1084
1085
1086/* BMI_tcp_post_recv()
1087 *
1088 * Submits recv operations.
1089 *
1090 * returns 0 on success that requires later poll, returns 1 on instant
1091 * completion, -errno on failure
1092 */
1093int BMI_tcp_post_recv(bmi_op_id_t * id,
1094                      bmi_method_addr_p src,
1095                      void *buffer,
1096                      bmi_size_t expected_size,
1097                      bmi_size_t * actual_size,
1098                      enum bmi_buffer_type buffer_type,
1099                      bmi_msg_tag_t tag,
1100                      void *user_ptr,
1101                      bmi_context_id context_id,
1102                      PVFS_hint hints)
1103{
1104    int ret = -1;
1105
1106    /* A few things could happen here:
1107     * a) rendez. recv with sender not ready yet
1108     * b) rendez. recv with sender waiting
1109     * c) eager recv, data not available yet
1110     * d) eager recv, some/all data already here
1111     * e) rendez. recv with sender in eager mode
1112     *
1113     * b or d could lead to completion without polling.
1114     * we don't look for unexpected messages here.
1115     */
1116
1117    if (expected_size > TCP_MODE_REND_LIMIT)
1118    {
1119        return (bmi_tcp_errno_to_pvfs(-EINVAL));
1120    }
1121    gen_mutex_lock(&interface_mutex);
1122
1123    ret = tcp_post_recv_generic(id, src, &buffer, &expected_size,
1124                                1, expected_size, actual_size,
1125                                buffer_type, tag,
1126                                user_ptr, context_id, hints);
1127
1128    gen_mutex_unlock(&interface_mutex);
1129    return (ret);
1130}
1131
1132
1133/* BMI_tcp_test()
1134 *
1135 * Checks to see if a particular message has completed.
1136 *
1137 * returns 0 on success, -errno on failure
1138 */
1139int BMI_tcp_test(bmi_op_id_t id,
1140                 int *outcount,
1141                 bmi_error_code_t * error_code,
1142                 bmi_size_t * actual_size,
1143                 void **user_ptr,
1144                 int max_idle_time,
1145                 bmi_context_id context_id)
1146{
1147    int ret = -1;
1148    method_op_p query_op = (method_op_p)id_gen_fast_lookup(id);
1149
1150    assert(query_op != NULL);
1151
1152    gen_mutex_lock(&interface_mutex);
1153
1154    /* do some ``real work'' here */
1155    ret = tcp_do_work(max_idle_time);
1156    if (ret < 0)
1157    {
1158        gen_mutex_unlock(&interface_mutex);
1159        return (ret);
1160    }
1161
1162    if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1163        BMI_TCP_COMPLETE)
1164    {
1165        assert(query_op->context_id == context_id);
1166        op_list_remove(query_op);
1167        if (user_ptr != NULL)
1168        {
1169            (*user_ptr) = query_op->user_ptr;
1170        }
1171        (*error_code) = query_op->error_code;
1172        (*actual_size) = query_op->actual_size;
1173        PINT_EVENT_END(
1174            (query_op->send_recv == BMI_SEND ?
1175             bmi_tcp_send_event_id : bmi_tcp_recv_event_id), bmi_tcp_pid, NULL,
1176             query_op->event_id, id, *actual_size);
1177
1178        dealloc_tcp_method_op(query_op);
1179        (*outcount)++;
1180    }
1181
1182    gen_mutex_unlock(&interface_mutex);
1183    return (0);
1184}
1185
1186/* BMI_tcp_testsome()
1187 *
1188 * Checks to see if any messages from the specified list have completed.
1189 *
1190 * returns 0 on success, -errno on failure
1191 */
1192int BMI_tcp_testsome(int incount,
1193                     bmi_op_id_t * id_array,
1194                     int *outcount,
1195                     int *index_array,
1196                     bmi_error_code_t * error_code_array,
1197                     bmi_size_t * actual_size_array,
1198                     void **user_ptr_array,
1199                     int max_idle_time,
1200                     bmi_context_id context_id)
1201{
1202    int ret = -1;
1203    method_op_p query_op = NULL;
1204    int i;
1205
1206    gen_mutex_lock(&interface_mutex);
1207
1208    /* do some ``real work'' here */
1209    ret = tcp_do_work(max_idle_time);
1210    if (ret < 0)
1211    {
1212        gen_mutex_unlock(&interface_mutex);
1213        return (ret);
1214    }
1215
1216    for(i=0; i<incount; i++)
1217    {
1218        if(id_array[i])
1219        {
1220            /* NOTE: this depends on the user passing in valid id's;
1221             * otherwise we segfault. 
1222             */
1223            query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
1224            if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1225               BMI_TCP_COMPLETE)
1226            {
1227                assert(query_op->context_id == context_id);
1228                /* this one's done; pop it out */
1229                op_list_remove(query_op);
1230                error_code_array[*outcount] = query_op->error_code;
1231                actual_size_array[*outcount] = query_op->actual_size;
1232                index_array[*outcount] = i;
1233                if (user_ptr_array != NULL)
1234                {
1235                    user_ptr_array[*outcount] = query_op->user_ptr;
1236                }
1237                PINT_EVENT_END(
1238                    (query_op->send_recv == BMI_SEND ?
1239                     bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
1240                    bmi_tcp_pid, NULL,
1241                    query_op->event_id, actual_size_array[*outcount]);
1242                dealloc_tcp_method_op(query_op);
1243                (*outcount)++;
1244            }
1245        }
1246    }
1247
1248    gen_mutex_unlock(&interface_mutex);
1249    return(0);
1250}
1251
1252
1253/* BMI_tcp_testunexpected()
1254 *
1255 * Checks to see if any unexpected messages have completed.
1256 *
1257 * returns 0 on success, -errno on failure
1258 */
1259int BMI_tcp_testunexpected(int incount,
1260                           int *outcount,
1261                           struct bmi_method_unexpected_info *info,
1262                           int max_idle_time)
1263{
1264    int ret = -1;
1265    method_op_p query_op = NULL;
1266
1267    gen_mutex_lock(&interface_mutex);
1268
1269    if(op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
1270    {
1271        /* do some ``real work'' here */
1272        ret = tcp_do_work(max_idle_time);
1273        if (ret < 0)
1274        {
1275            gen_mutex_unlock(&interface_mutex);
1276            return (ret);
1277        }
1278    }
1279
1280    *outcount = 0;
1281
1282    /* go through the completed/unexpected list as long as we are finding
1283     * stuff and we have room in the info array for it
1284     */
1285    while ((*outcount < incount) &&
1286           (query_op =
1287            op_list_shownext(op_list_array[IND_COMPLETE_RECV_UNEXP])))
1288    {
1289        info[*outcount].error_code = query_op->error_code;
1290        info[*outcount].addr = query_op->addr;
1291        info[*outcount].buffer = query_op->buffer;
1292        info[*outcount].size = query_op->actual_size;
1293        info[*outcount].tag = query_op->msg_tag;
1294        op_list_remove(query_op);
1295        dealloc_tcp_method_op(query_op);
1296        (*outcount)++;
1297    }
1298    gen_mutex_unlock(&interface_mutex);
1299    return (0);
1300}
1301
1302
1303/* BMI_tcp_testcontext()
1304 *
1305 * Checks to see if any messages from the specified context have completed.
1306 *
1307 * returns 0 on success, -errno on failure
1308 */
1309int BMI_tcp_testcontext(int incount,
1310                     bmi_op_id_t* out_id_array,
1311                     int *outcount,
1312                     bmi_error_code_t * error_code_array,
1313                     bmi_size_t * actual_size_array,
1314                     void **user_ptr_array,
1315                     int max_idle_time,
1316                     bmi_context_id context_id)
1317{
1318    int ret = -1;
1319    method_op_p query_op = NULL;
1320
1321    *outcount = 0;
1322
1323    gen_mutex_lock(&interface_mutex);
1324
1325    if(op_list_empty(completion_array[context_id]))
1326    {
1327        /* if there are unexpected ops ready to go, then short out so
1328         * that the next testunexpected call can pick it up without
1329         * delay
1330         */
1331        if(check_unexpected &&
1332           !op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
1333        {
1334            gen_mutex_unlock(&interface_mutex);
1335            return(0);
1336        }
1337
1338        /* do some ``real work'' here */
1339        ret = tcp_do_work(max_idle_time);
1340        if (ret < 0)
1341        {
1342            gen_mutex_unlock(&interface_mutex);
1343            return (ret);
1344        }
1345    }
1346
1347    /* pop as many items off of the completion queue as we can */
1348    while((*outcount < incount) &&
1349          (query_op =
1350           op_list_shownext(completion_array[context_id])))
1351    {
1352        assert(query_op);
1353        assert(query_op->context_id == context_id);
1354
1355        /* this one's done; pop it out */
1356        op_list_remove(query_op);
1357        error_code_array[*outcount] = query_op->error_code;
1358        actual_size_array[*outcount] = query_op->actual_size;
1359        out_id_array[*outcount] = query_op->op_id;
1360        if (user_ptr_array != NULL)
1361        {
1362            user_ptr_array[*outcount] = query_op->user_ptr;
1363        }
1364
1365        PINT_EVENT_END((query_op->send_recv == BMI_SEND ?
1366                        bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
1367                       bmi_tcp_pid, NULL, query_op->event_id,
1368                       query_op->actual_size);
1369
1370        dealloc_tcp_method_op(query_op);
1371        query_op = NULL;
1372        (*outcount)++;
1373    }
1374
1375    gen_mutex_unlock(&interface_mutex);
1376    return(0);
1377}
1378
1379
1380
1381/* BMI_tcp_post_send_list()
1382 *
1383 * same as the BMI_tcp_post_send() function, except that it sends
1384 * from an array of possibly non contiguous buffers
1385 *
1386 * returns 0 on success, 1 on immediate successful completion,
1387 * -errno on failure
1388 */
1389int BMI_tcp_post_send_list(bmi_op_id_t * id,
1390                           bmi_method_addr_p dest,
1391                           const void *const *buffer_list,
1392                           const bmi_size_t *size_list,
1393                           int list_count,
1394                           bmi_size_t total_size,
1395                           enum bmi_buffer_type buffer_type,
1396                           bmi_msg_tag_t tag,
1397                           void *user_ptr,
1398                           bmi_context_id context_id,
1399                           PVFS_hint hints)
1400{
1401    struct tcp_msg_header my_header;
1402    int ret = -1;
1403
1404    /* clear the id field for safety */
1405    *id = 0;
1406
1407    /* fill in the TCP-specific message header */
1408    if (total_size > TCP_MODE_REND_LIMIT)
1409    {
1410        gossip_lerr("Error: BMI message too large!\n");
1411        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1412    }
1413
1414    if (total_size <= TCP_MODE_EAGER_LIMIT)
1415    {
1416        my_header.mode = TCP_MODE_EAGER;
1417    }
1418    else
1419    {
1420        my_header.mode = TCP_MODE_REND;
1421    }
1422    my_header.tag = tag;
1423    my_header.size = total_size;
1424    my_header.magic_nr = BMI_MAGIC_NR;
1425
1426    gen_mutex_lock(&interface_mutex);
1427
1428    ret = tcp_post_send_generic(id, dest, buffer_list,
1429                                size_list, list_count, buffer_type,
1430                                my_header, user_ptr, context_id, hints);
1431    gen_mutex_unlock(&interface_mutex);
1432    return(ret);
1433}
1434
1435/* BMI_tcp_post_recv_list()
1436 *
1437 * same as the BMI_tcp_post_recv() function, except that it recvs
1438 * into an array of possibly non contiguous buffers
1439 *
1440 * returns 0 on success, 1 on immediate successful completion,
1441 * -errno on failure
1442 */
1443int BMI_tcp_post_recv_list(bmi_op_id_t * id,
1444                           bmi_method_addr_p src,
1445                           void *const *buffer_list,
1446                           const bmi_size_t *size_list,
1447                           int list_count,
1448                           bmi_size_t total_expected_size,
1449                           bmi_size_t * total_actual_size,
1450                           enum bmi_buffer_type buffer_type,
1451                           bmi_msg_tag_t tag,
1452                           void *user_ptr,
1453                           bmi_context_id context_id,
1454                           PVFS_hint hints)
1455{
1456    int ret = -1;
1457
1458    if (total_expected_size > TCP_MODE_REND_LIMIT)
1459    {
1460        return (bmi_tcp_errno_to_pvfs(-EINVAL));
1461    }
1462
1463    gen_mutex_lock(&interface_mutex);
1464
1465    ret = tcp_post_recv_generic(id, src, buffer_list, size_list,
1466                                list_count, total_expected_size,
1467                                total_actual_size, buffer_type, tag, user_ptr,
1468                                context_id, hints);
1469
1470    gen_mutex_unlock(&interface_mutex);
1471    return (ret);
1472}
1473
1474
1475/* BMI_tcp_post_sendunexpected_list()
1476 *
1477 * same as the BMI_tcp_post_sendunexpected() function, except that
1478 * it sends from an array of possibly non contiguous buffers
1479 *
1480 * returns 0 on success, 1 on immediate successful completion,
1481 * -errno on failure
1482 */
1483int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
1484                                     bmi_method_addr_p dest,
1485                                     const void *const *buffer_list,
1486                                     const bmi_size_t *size_list,
1487                                     int list_count,
1488                                     bmi_size_t total_size,
1489                                     enum bmi_buffer_type buffer_type,
1490                                     bmi_msg_tag_t tag,
1491                                     void *user_ptr,
1492                                     bmi_context_id context_id,
1493                                     PVFS_hint hints)
1494{
1495    struct tcp_msg_header my_header;
1496    int ret = -1;
1497
1498    /* clear the id field for safety */
1499    *id = 0;
1500
1501    if (total_size > TCP_MODE_EAGER_LIMIT)
1502    {
1503        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1504    }
1505
1506    my_header.mode = TCP_MODE_UNEXP;
1507    my_header.tag = tag;
1508    my_header.size = total_size;
1509    my_header.magic_nr = BMI_MAGIC_NR;
1510
1511    gen_mutex_lock(&interface_mutex);
1512
1513    ret = tcp_post_send_generic(id, dest, buffer_list,
1514                                size_list, list_count, buffer_type,
1515                                my_header, user_ptr, context_id, hints);
1516
1517    gen_mutex_unlock(&interface_mutex);
1518    return(ret);
1519}
1520
1521
1522/* BMI_tcp_open_context()
1523 *
1524 * opens a new context with the specified context id
1525 *
1526 * returns 0 on success, -errno on failure
1527 */
1528int BMI_tcp_open_context(bmi_context_id context_id)
1529{
1530
1531    gen_mutex_lock(&interface_mutex);
1532
1533    /* start a new queue for tracking completions in this context */
1534    completion_array[context_id] = op_list_new();
1535    if (!completion_array[context_id])
1536    {
1537        gen_mutex_unlock(&interface_mutex);
1538        return(bmi_tcp_errno_to_pvfs(-ENOMEM));
1539    }
1540
1541    gen_mutex_unlock(&interface_mutex);
1542    return(0);
1543}
1544
1545
1546/* BMI_tcp_close_context()
1547 *
1548 * shuts down a context, previously opened with BMI_tcp_open_context()
1549 *
1550 * no return value
1551 */
1552void BMI_tcp_close_context(bmi_context_id context_id)
1553{
1554   
1555    gen_mutex_lock(&interface_mutex);
1556
1557    /* tear down completion queue for this context */
1558    op_list_cleanup(completion_array[context_id]);
1559
1560    gen_mutex_unlock(&interface_mutex);
1561    return;
1562}
1563
1564
1565/* BMI_tcp_cancel()
1566 *
1567 * attempt to cancel a pending bmi tcp operation
1568 *
1569 * returns 0 on success, -errno on failure
1570 */
1571int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id)
1572{
1573    method_op_p query_op = NULL;
1574   
1575    gen_mutex_lock(&interface_mutex);
1576
1577    query_op = (method_op_p)id_gen_fast_lookup(id);
1578    if(!query_op)
1579    {
1580        /* if we can't find the operattion, then assume that it has already
1581         * completed naturally
1582         */
1583        gen_mutex_unlock(&interface_mutex);
1584        return(0);
1585    }
1586
1587    /* easy case: is the operation already completed? */
1588    if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1589        BMI_TCP_COMPLETE)
1590    {
1591        /* only close socket in forceful cancel mode */
1592        if(forceful_cancel_mode)
1593            tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1594        /* we are done! status will be collected during test */
1595        gen_mutex_unlock(&interface_mutex);
1596        return(0);
1597    }
1598
1599    /* has the operation started moving data yet? */
1600    if(query_op->env_amt_complete)
1601    {
1602        /* be pessimistic and kill the socket, even if not in forceful
1603         * cancel mode */
1604        /* NOTE: this may place other operations beside this one into
1605         * EINTR error state
1606         */
1607        tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1608        gen_mutex_unlock(&interface_mutex);
1609        return(0);
1610    }
1611
1612    /* if we fall to this point, op has been posted, but no data has moved
1613     * for it yet as far as we know
1614     */
1615
1616    /* mark op as canceled, move to completion queue */
1617    query_op->error_code = -BMI_ECANCEL;
1618    if(query_op->send_recv == BMI_SEND)
1619    {
1620        BMI_socket_collection_remove_write_bit(tcp_socket_collection_p,
1621                                           query_op->addr);
1622    }
1623    op_list_remove(query_op);
1624    ((struct tcp_op*)(query_op->method_data))->tcp_op_state =
1625        BMI_TCP_COMPLETE;
1626    /* only close socket in forceful cancel mode */
1627    if(forceful_cancel_mode)
1628        tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1629    op_list_add(completion_array[query_op->context_id], query_op);
1630    gen_mutex_unlock(&interface_mutex);
1631    return(0);
1632}
1633
1634/*
1635 * For now, we only support wildcard strings that are IP addresses
1636 * and not *hostnames*!
1637 */
1638static int check_valid_wildcard(const char *wildcard_string, unsigned long *octets)
1639{
1640    int i, len = strlen(wildcard_string), last_dot = -1, octet_count = 0;
1641    char str[16];
1642    for (i = 0; i < len; i++)
1643    {
1644        char c = wildcard_string[i];
1645        memset(str, 0, 16);
1646        if ((c < '0' || c > '9') && c != '*' && c != '.')
1647            return -EINVAL;
1648        if (c == '*') {
1649            if (octet_count >= 4)
1650                return -EINVAL;
1651            octets[octet_count++] = 256;
1652        }
1653        else if (c == '.')
1654        {
1655            char *endptr = NULL;
1656            if (octet_count >= 4)
1657                return -EINVAL;
1658            strncpy(str, &wildcard_string[last_dot + 1], (i - last_dot - 1));
1659            octets[octet_count++] = strtol(str, &endptr, 10);
1660            if (*endptr != '\0' || octets[octet_count-1] >= 256)
1661                return -EINVAL;
1662            last_dot = i;
1663        }
1664    }
1665    for (i = octet_count; i < 4; i++)
1666    {
1667         octets[i] = 256;
1668    }
1669    return 0;
1670}
1671
1672/*
1673 * return 1 if the addr specified is part of the wildcard specification of octet
1674 * return 0 otherwise.
1675 */
1676static int check_octets(struct in_addr addr, unsigned long *octets)
1677{
1678#define B1_MASK  0xff000000
1679#define B1_SHIFT 24
1680#define B2_MASK  0x00ff0000
1681#define B2_SHIFT 16
1682#define B3_MASK  0x0000ff00
1683#define B3_SHIFT 8
1684#define B4_MASK  0x000000ff
1685    uint32_t host_addr = ntohl(addr.s_addr);
1686    /* * stands for all clients */
1687    if (octets[0] == 256)
1688    {
1689        return 1;
1690    }
1691    if (((host_addr & B1_MASK) >> B1_SHIFT) != octets[0])
1692    {
1693        return 0;
1694    }
1695    if (octets[1] == 256)
1696    {
1697        return 1;
1698    }
1699    if (((host_addr & B2_MASK) >> B2_SHIFT) != octets[1])
1700    {
1701        return 0;
1702    }
1703    if (octets[2] == 256)
1704    {
1705        return 1;
1706    }
1707    if (((host_addr & B3_MASK) >> B3_SHIFT) != octets[2])
1708    {
1709        return 0;
1710    }
1711    if (octets[3] == 256)
1712    {
1713        return 1;
1714    }
1715    if ((host_addr & B4_MASK) != octets[3])
1716    {
1717        return 0;
1718    }
1719    return 1;
1720#undef B1_MASK
1721#undef B1_SHIFT
1722#undef B2_MASK
1723#undef B2_SHIFT
1724#undef B3_MASK
1725#undef B3_SHIFT
1726#undef B4_MASK
1727}
1728/* BMI_tcp_query_addr_range()
1729 * Check if a given address is within the network specified by the wildcard string!
1730 * or if it is part of the subnet mask specified
1731 */
1732int BMI_tcp_query_addr_range(bmi_method_addr_p map, const char *wildcard_string, int netmask)
1733{
1734    struct tcp_addr *tcp_addr_data = map->method_data;
1735    struct sockaddr_in map_addr;
1736    socklen_t map_addr_len = sizeof(map_addr);
1737    const char *tcp_wildcard = wildcard_string + 6 /* strlen("tcp://") */;
1738    int ret = -1;
1739
1740    memset(&map_addr, 0, sizeof(map_addr));
1741    if(getpeername(tcp_addr_data->socket, (struct sockaddr *) &map_addr, &map_addr_len) < 0)
1742    {
1743        ret =  bmi_tcp_errno_to_pvfs(-EINVAL);
1744        gossip_err("Error: failed to retrieve peer name for client.\n");
1745        return(ret);
1746    }
1747    /* Wildcard specification */
1748    if (netmask == -1)
1749    {
1750        unsigned long octets[4];
1751        if (check_valid_wildcard(tcp_wildcard, octets) < 0)
1752        {
1753            gossip_lerr("Invalid wildcard specification: %s\n", tcp_wildcard);
1754            return -EINVAL;
1755        }
1756        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Map Address is : %s, Wildcard Octets: %lu.%lu.%lu.%lu\n", inet_ntoa(map_addr.sin_addr),
1757                octets[0], octets[1], octets[2], octets[3]);
1758        if (check_octets(map_addr.sin_addr, octets) == 1)
1759        {
1760            return 1;
1761        }
1762    }
1763    /* Netmask specification */
1764    else {
1765        struct sockaddr_in mask_addr, network_addr;
1766        memset(&mask_addr, 0, sizeof(mask_addr));
1767        memset(&network_addr, 0, sizeof(network_addr));
1768        /* Convert the netmask address */
1769        convert_mask(netmask, &mask_addr.sin_addr);
1770        /* Invalid network address */
1771        if (inet_aton(tcp_wildcard, &network_addr.sin_addr) == 0)
1772        {
1773            gossip_err("Invalid network specification: %s\n", tcp_wildcard);
1774            return -EINVAL;
1775        }
1776        /* Matches the subnet mask! */
1777        if ((map_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr)
1778                == (network_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr))
1779        {
1780            return 1;
1781        }
1782    }
1783    return 0;
1784}
1785
1786/* BMI_tcp_addr_rev_lookup_unexpected()
1787 *
1788 * looks up an address that was initialized unexpectedly and returns a string
1789 * hostname
1790 *
1791 * returns string on success, "UNKNOWN" on failure
1792 */
1793const char* BMI_tcp_addr_rev_lookup_unexpected(bmi_method_addr_p map)
1794{
1795    struct tcp_addr *tcp_addr_data = map->method_data;
1796    int debug_on;
1797    uint64_t mask;
1798    socklen_t peerlen;
1799    struct sockaddr_in peer;
1800    int ret;
1801    struct hostent *peerent;
1802    char* tmp_peer;
1803
1804    /* return default response if we don't have support for the right socket
1805     * calls
1806     */
1807#if !defined(HAVE_GETHOSTBYADDR)
1808    return(tcp_addr_data->peer);
1809#else
1810
1811    /* Only resolve hostnames if a gossip mask is set to request it.
1812     * Otherwise we leave it at ip address
1813     */
1814    gossip_get_debug_mask(&debug_on, &mask);
1815
1816    if(!debug_on || (!(mask & GOSSIP_ACCESS_HOSTNAMES)))
1817    {
1818        return(tcp_addr_data->peer);
1819    }
1820
1821    peerlen = sizeof(struct sockaddr_in);
1822
1823    if(tcp_addr_data->peer_type == BMI_TCP_PEER_HOSTNAME)
1824    {
1825        /* full hostname already cached; return now */
1826        return(tcp_addr_data->peer);
1827    }
1828
1829    /* if we hit this point, we need to resolve hostname */
1830    ret = getpeername(tcp_addr_data->socket, (struct sockaddr*)&(peer), &peerlen);
1831    if(ret < 0)
1832    {
1833        /* default to use IP address */
1834        return(tcp_addr_data->peer);
1835    }
1836
1837    peerent = gethostbyaddr((void*)&peer.sin_addr.s_addr,
1838        sizeof(struct in_addr), AF_INET);
1839    if(peerent == NULL)
1840    {
1841        /* default to use IP address */
1842        return(tcp_addr_data->peer);
1843    }
1844 
1845    tmp_peer = (char*)malloc(strlen(peerent->h_name) + 1);
1846    if(!tmp_peer)
1847    {
1848        /* default to use IP address */
1849        return(tcp_addr_data->peer);
1850    }
1851    strcpy(tmp_peer, peerent->h_name);
1852    if(tcp_addr_data->peer)
1853    {
1854        free(tcp_addr_data->peer);
1855    }
1856    tcp_addr_data->peer = tmp_peer;
1857    tcp_addr_data->peer_type = BMI_TCP_PEER_HOSTNAME;
1858    return(tcp_addr_data->peer);
1859
1860#endif
1861
1862}
1863
1864/* tcp_forget_addr()
1865 *
1866 * completely removes a tcp method address from use, and aborts any
1867 * operations that use the address.  If the
1868 * dealloc_flag is set, the memory used by the address will be
1869 * deallocated as well.
1870 *
1871 * no return value
1872 */
1873void tcp_forget_addr(bmi_method_addr_p map,
1874                     int dealloc_flag,
1875                     int error_code)
1876{
1877    struct tcp_addr* tcp_addr_data = map->method_data;
1878    BMI_addr_t bmi_addr = tcp_addr_data->bmi_addr;
1879    int tmp_outcount;
1880    bmi_method_addr_p tmp_addr;
1881    int tmp_status;
1882
1883    if (tcp_socket_collection_p)
1884    {
1885        BMI_socket_collection_remove(tcp_socket_collection_p, map);
1886        /* perform a test to force the socket collection to act on the remove
1887         * request before continuing
1888         */
1889        if(!sc_test_busy)
1890        {
1891            BMI_socket_collection_testglobal(tcp_socket_collection_p,
1892                0, &tmp_outcount, &tmp_addr, &tmp_status, 0);
1893        }
1894    }
1895
1896    tcp_shutdown_addr(map);
1897    tcp_cleanse_addr(map, error_code);
1898    tcp_addr_data->addr_error = error_code;
1899    if (dealloc_flag)
1900    {
1901        dealloc_tcp_method_addr(map);
1902    }
1903    else
1904    {
1905        /* this will cause the bmi control layer to check to see if
1906         * this address can be completely forgotten
1907         */
1908        bmi_method_addr_forget_callback(bmi_addr);
1909    }
1910    return;
1911};
1912
1913/******************************************************************
1914 * Internal support functions
1915 */
1916
1917
1918/*
1919 * dealloc_tcp_method_addr()
1920 *
1921 * destroys method address structures generated by the TCP/IP module.
1922 *
1923 * no return value
1924 */
1925static void dealloc_tcp_method_addr(bmi_method_addr_p map)
1926{
1927
1928    struct tcp_addr *tcp_addr_data = NULL;
1929
1930    tcp_addr_data = map->method_data;
1931    /* close the socket, as long as it is not the one we are listening on
1932     * as a server.
1933     */
1934    if (!tcp_addr_data->server_port)
1935    {
1936        if (tcp_addr_data->socket > -1)
1937        {
1938            close(tcp_addr_data->socket);
1939        }
1940    }
1941
1942    if (tcp_addr_data->hostname)
1943        free(tcp_addr_data->hostname);
1944    if (tcp_addr_data->peer)
1945        free(tcp_addr_data->peer);
1946
1947    bmi_dealloc_method_addr(map);
1948
1949    return;
1950}
1951
1952
1953/*
1954 * alloc_tcp_method_addr()
1955 *
1956 * creates a new method address with defaults filled in for TCP/IP.
1957 *
1958 * returns pointer to struct on success, NULL on failure
1959 */
1960bmi_method_addr_p alloc_tcp_method_addr(void)
1961{
1962
1963    struct bmi_method_addr *my_method_addr = NULL;
1964    struct tcp_addr *tcp_addr_data = NULL;
1965
1966    my_method_addr =
1967        bmi_alloc_method_addr(tcp_method_params.method_id, sizeof(struct tcp_addr));
1968    if (!my_method_addr)
1969    {
1970        return (NULL);
1971    }
1972
1973    /* note that we trust the alloc_method_addr() function to have zeroed
1974     * out the structures for us already
1975     */
1976
1977    tcp_addr_data = my_method_addr->method_data;
1978    tcp_addr_data->socket = -1;
1979    tcp_addr_data->port = -1;
1980    tcp_addr_data->map = my_method_addr;
1981    tcp_addr_data->sc_index = -1;
1982
1983    return (my_method_addr);
1984}
1985
1986
1987/*
1988 * tcp_server_init()
1989 *
1990 * this function is used to prepare a node to recieve incoming
1991 * connections if it is initialized in a server configuration.   
1992 *
1993 * returns 0 on succes, -errno on failure
1994 */
1995static int tcp_server_init(void)
1996{
1997
1998    int oldfl = 0;              /* old socket flags */
1999    struct tcp_addr *tcp_addr_data = NULL;
2000    int tmp_errno = bmi_tcp_errno_to_pvfs(-EINVAL);
2001    int ret = 0;
2002
2003    /* create a socket */
2004    tcp_addr_data = tcp_method_params.listen_addr->method_data;
2005    if ((tcp_addr_data->socket = BMI_sockio_new_sock()) < 0)
2006    {
2007        tmp_errno = errno;
2008        gossip_err("Error: BMI_sockio_new_sock: %s\n", strerror(tmp_errno));
2009        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2010    }
2011
2012    /* set it to non-blocking operation */
2013    oldfl = fcntl(tcp_addr_data->socket, F_GETFL, 0);
2014    if (!(oldfl & O_NONBLOCK))
2015    {
2016        fcntl(tcp_addr_data->socket, F_SETFL, oldfl | O_NONBLOCK);
2017    }
2018
2019    /* setup for a fast restart to avoid bind addr in use errors */
2020    BMI_sockio_set_sockopt(tcp_addr_data->socket, SO_REUSEADDR, 1);
2021
2022    /* bind it to the appropriate port */
2023    if(tcp_method_params.method_flags & BMI_TCP_BIND_SPECIFIC)
2024    {
2025        ret = BMI_sockio_bind_sock_specific(tcp_addr_data->socket,
2026            tcp_addr_data->hostname,
2027            tcp_addr_data->port);
2028        /* NOTE: this particular function converts errno in advance */
2029        if(ret < 0)
2030        {
2031            PVFS_perror_gossip("BMI_sockio_bind_sock_specific", ret);
2032            return(ret);
2033        }
2034    }
2035    else
2036    {
2037        ret = BMI_sockio_bind_sock(tcp_addr_data->socket,
2038            tcp_addr_data->port);
2039    }
2040   
2041    if (ret < 0)
2042    {
2043        tmp_errno = errno;
2044        gossip_err("Error: BMI_sockio_bind_sock: %s\n", strerror(tmp_errno));
2045        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2046    }
2047
2048    /* go ahead and listen to the socket */
2049    if (listen(tcp_addr_data->socket, TCP_BACKLOG) != 0)
2050    {
2051        tmp_errno = errno;
2052        gossip_err("Error: listen: %s\n", strerror(tmp_errno));
2053        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2054    }
2055
2056    return (0);
2057}
2058
2059
2060/* find_recv_inflight()
2061 *
2062 * checks to see if there is a recv operation in flight (when in flight
2063 * means that some of the data or envelope has been read) for a
2064 * particular address.
2065 *
2066 * returns pointer to operation on success, NULL if nothing found.
2067 */
2068static method_op_p find_recv_inflight(bmi_method_addr_p map)
2069{
2070    struct op_list_search_key key;
2071    method_op_p query_op = NULL;
2072
2073    memset(&key, 0, sizeof(struct op_list_search_key));
2074    key.method_addr = map;
2075    key.method_addr_yes = 1;
2076
2077    query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
2078
2079    return (query_op);
2080}
2081
2082
2083/* tcp_sock_init()
2084 *
2085 * this is an internal function which is used to build up a TCP/IP
2086 * connection in the situation of a client side operation.
2087 * addressing information to determine which fields need to be set.
2088 * If the connection is already established then it does no work.
2089 *
2090 * NOTE: this is safe to call repeatedly.  However, always check the
2091 * value of the not_connected field in the tcp address before using the
2092 * address.
2093 *
2094 * returns 0 on success, -errno on failure
2095 */
2096static int tcp_sock_init(bmi_method_addr_p my_method_addr)
2097{
2098
2099    int oldfl = 0;              /* socket flags */
2100    int ret = -1;
2101    struct pollfd poll_conn;
2102    struct tcp_addr *tcp_addr_data = my_method_addr->method_data;
2103    int tmp_errno = 0;
2104
2105    /* check for obvious problems */
2106    assert(my_method_addr);
2107    assert(my_method_addr->method_type == tcp_method_params.method_id);
2108    assert(tcp_addr_data->server_port == 0);
2109
2110    /* fail immediately if the address is in failure mode and we have no way
2111     * to reconnect
2112     */
2113    if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
2114    {
2115        gossip_debug(GOSSIP_BMI_DEBUG_TCP,
2116        "Warning: BMI communication attempted on an address in failure mode.\n");
2117        return(tcp_addr_data->addr_error);
2118    }
2119
2120    if(tcp_addr_data->addr_error)
2121    {
2122        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "%s: attempting reconnect.\n",
2123          __func__);
2124        tcp_addr_data->addr_error = 0;
2125        assert(tcp_addr_data->socket < 0);
2126        tcp_addr_data->not_connected = 1;
2127    }
2128
2129    /* is there already a socket? */
2130    if (tcp_addr_data->socket > -1)
2131    {
2132        /* check to see if we still need to work on the connect.. */
2133        if (tcp_addr_data->not_connected)
2134        {
2135            /* this is a little weird, but we complete the nonblocking
2136             * connection by polling */
2137            poll_conn.fd = tcp_addr_data->socket;
2138            poll_conn.events = POLLOUT;
2139            ret = poll(&poll_conn, 1, 2);
2140            if ((ret < 0) || (poll_conn.revents & POLLERR))
2141            {
2142                tmp_errno = errno;
2143                gossip_lerr("Error: poll: %s\n", strerror(tmp_errno));
2144                return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2145            }
2146            if (poll_conn.revents & POLLOUT)
2147            {
2148                tcp_addr_data->not_connected = 0;
2149            }
2150        }
2151        /* return.  the caller should check the "not_connected" flag to
2152         * see if the socket is usable yet. */
2153        return (0);
2154    }
2155 
2156 /* This function call makes no sense - we know the socket
2157  * is not a valid value, otherwise we would have taken the
2158  * above branch.  Commenting out for now WBL 9/11
2159  */
2160/*    bmi_set_sock_buffers(tcp_addr_data->socket); */
2161
2162    /* at this point there is no socket.  try to build it */
2163    if (tcp_addr_data->port < 1)
2164    {
2165        return (bmi_tcp_errno_to_pvfs(-EINVAL));
2166    }
2167
2168    /* make a socket */
2169    if ((tcp_addr_data->socket = BMI_sockio_new_sock()) < 0)
2170    {
2171        tmp_errno = errno;
2172        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2173    }
2174
2175    /* set it to non-blocking operation */
2176    oldfl = fcntl(tcp_addr_data->socket, F_GETFL, 0);
2177    if (!(oldfl & O_NONBLOCK))
2178    {
2179        fcntl(tcp_addr_data->socket, F_SETFL, oldfl | O_NONBLOCK);
2180    }
2181
2182#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
2183    /* make sure if we need to bind or not to some local port ranges */
2184    tcp_enable_trusted(tcp_addr_data);
2185#endif
2186
2187    /* turn off Nagle's algorithm */
2188    if (BMI_sockio_set_tcpopt(tcp_addr_data->socket, TCP_NODELAY, 1) < 0)
2189    {
2190        tmp_errno = errno;
2191        gossip_lerr("Error: failed to set TCP_NODELAY option.\n");
2192        close(tcp_addr_data->socket);
2193        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2194    }
2195
2196       bmi_set_sock_buffers(tcp_addr_data->socket);
2197
2198    if (tcp_addr_data->hostname)
2199    {
2200        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
2201                      "Connect: socket=%d, hostname=%s, port=%d\n",
2202                      tcp_addr_data->socket, tcp_addr_data->hostname,
2203                      tcp_addr_data->port);
2204        ret = BMI_sockio_connect_sock(tcp_addr_data->socket,
2205                      tcp_addr_data->hostname,
2206                      tcp_addr_data->port);
2207    }
2208    else
2209    {
2210        return (bmi_tcp_errno_to_pvfs(-EINVAL));
2211    }
2212
2213    if (ret < 0)
2214    {
2215        if (ret == -EINPROGRESS)
2216        {
2217            tcp_addr_data->not_connected = 1;
2218            /* this will have to be connected later with a poll */
2219        }
2220        else
2221        {
2222            /* NOTE: BMI_sockio_connect_sock returns a PVFS error */
2223            char buff[300];
2224
2225            snprintf(buff, 300, "Error: BMI_sockio_connect_sock: (%s):",
2226                     tcp_addr_data->hostname);
2227
2228            PVFS_perror_gossip(buff, ret);
2229            return (ret);
2230        }
2231    }
2232
2233    return (0);
2234}
2235
2236
2237/* enqueue_operation()
2238 *
2239 * creates a new operation based on the arguments to the function.  It
2240 * then makes sure that the address is added to the socket collection,
2241 * and the operation is added to the appropriate operation queue.
2242 *
2243 * Damn, what a big prototype!
2244 *
2245 * returns 0 on success, -errno on failure
2246 */
2247static int enqueue_operation(op_list_p target_list,
2248                             enum bmi_op_type send_recv,
2249                             bmi_method_addr_p map,
2250                             void *const *buffer_list,
2251                             const bmi_size_t *size_list,
2252                             int list_count,
2253                             bmi_size_t amt_complete,
2254                             bmi_size_t env_amt_complete,
2255                             bmi_op_id_t * id,
2256                             int tcp_op_state,
2257                             struct tcp_msg_header header,
2258                             void *user_ptr,
2259                             bmi_size_t actual_size,
2260                             bmi_size_t expected_size,
2261                             bmi_context_id context_id,
2262                             int32_t eid)
2263{
2264    method_op_p new_method_op = NULL;
2265    struct tcp_op *tcp_op_data = NULL;
2266    struct tcp_addr* tcp_addr_data = NULL;
2267    int i;
2268
2269    /* allocate the operation structure */
2270    new_method_op = alloc_tcp_method_op();
2271    if (!new_method_op)
2272    {
2273        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
2274    }
2275
2276    *id = new_method_op->op_id;
2277    new_method_op->event_id = eid;
2278
2279    /* set the fields */
2280    new_method_op->send_recv = send_recv;
2281    new_method_op->addr = map;
2282    new_method_op->user_ptr = user_ptr;
2283    /* this is on purpose; we want to use the buffer_list all of
2284     * the time, no special case for one contig buffer
2285     */
2286    new_method_op->buffer = NULL;
2287    new_method_op->actual_size = actual_size;
2288    new_method_op->expected_size = expected_size;
2289    new_method_op->send_recv = send_recv;
2290    new_method_op->amt_complete = amt_complete;
2291    new_method_op->env_amt_complete = env_amt_complete;
2292    new_method_op->msg_tag = header.tag;
2293    new_method_op->mode = header.mode;
2294    new_method_op->list_count = list_count;
2295    new_method_op->context_id = context_id;
2296
2297    /* set our current position in list processing */
2298    i=0;
2299    new_method_op->list_index = 0;
2300    new_method_op->cur_index_complete = 0;
2301    while(amt_complete > 0)
2302    {
2303        if(amt_complete >= size_list[i])
2304        {
2305            amt_complete -= size_list[i];
2306            new_method_op->list_index++;
2307            i++;
2308        }
2309        else
2310        {
2311            new_method_op->cur_index_complete = amt_complete;
2312            amt_complete = 0;
2313        }
2314    }
2315
2316    tcp_op_data = new_method_op->method_data;
2317    tcp_op_data->tcp_op_state = tcp_op_state;
2318    tcp_op_data->env = header;
2319
2320    /* if there is only one item in the list, then keep the list stored
2321     * in the op structure.  This allows us to use the same code for send
2322     * and recv as we use for send_list and recv_list, without having to
2323     * malloc lists for those special cases
2324     */
2325    if (list_count == 1)
2326    {
2327        new_method_op->buffer_list = &tcp_op_data->buffer_list_stub;
2328        new_method_op->size_list = &tcp_op_data->size_list_stub;
2329        ((void**)new_method_op->buffer_list)[0] = buffer_list[0];
2330        ((bmi_size_t*)new_method_op->size_list)[0] = size_list[0];
2331    }
2332    else
2333    {
2334        new_method_op->size_list = size_list;
2335        new_method_op->buffer_list = buffer_list;
2336    }
2337
2338    tcp_addr_data = map->method_data;
2339
2340    if(tcp_addr_data->addr_error)
2341    {
2342        /* server should always fail here, client should let receives queue
2343         * as if nothing were wrong
2344         */
2345        if(tcp_addr_data->dont_reconnect || send_recv == BMI_SEND)
2346        {
2347            gossip_debug(GOSSIP_BMI_DEBUG_TCP,
2348                       "Warning: BMI communication attempted on an "
2349                       "address in failure mode.\n");
2350            new_method_op->error_code = tcp_addr_data->addr_error;
2351            op_list_add(op_list_array[new_method_op->context_id],
2352                        new_method_op);
2353            return(tcp_addr_data->addr_error);
2354        }
2355    }
2356
2357#if 0
2358    if(tcp_addr_data->addr_error)
2359    {
2360        /* this address is bad, don't try to do anything with it */
2361        gossip_err("Warning: BMI communication attempted on an "
2362                   "address in failure mode.\n");
2363
2364        new_method_op->error_code = tcp_addr_data->addr_error;
2365        op_list_add(op_list_array[new_method_op->context_id],
2366                    new_method_op);
2367        return(tcp_addr_data->addr_error);
2368    }
2369#endif
2370
2371    /* add the socket to poll on */
2372    BMI_socket_collection_add(tcp_socket_collection_p, map);
2373    if(send_recv == BMI_SEND)
2374    {
2375        BMI_socket_collection_add_write_bit(tcp_socket_collection_p, map);
2376    }
2377
2378    /* keep up with the operation */
2379    op_list_add(target_list, new_method_op);
2380
2381    return (0);
2382}
2383
2384
2385/* tcp_post_recv_generic()
2386 *
2387 * does the real work of posting an operation - works for both
2388 * eager and rendezvous messages
2389 *
2390 * returns 0 on success that requires later poll, returns 1 on instant
2391 * completion, -errno on failure
2392 */
2393static int tcp_post_recv_generic(bmi_op_id_t * id,
2394                                 bmi_method_addr_p src,
2395                                 void *const *buffer_list,
2396                                 const bmi_size_t *size_list,
2397                                 int list_count,
2398                                 bmi_size_t expected_size,
2399                                 bmi_size_t * actual_size,
2400                                 enum bmi_buffer_type buffer_type,
2401                                 bmi_msg_tag_t tag,
2402                                 void *user_ptr,
2403                                 bmi_context_id context_id,
2404                                 PVFS_hint hints)
2405{
2406    method_op_p query_op = NULL;
2407    int ret = -1;
2408    struct tcp_addr *tcp_addr_data = NULL;
2409    struct tcp_op *tcp_op_data = NULL;
2410    struct tcp_msg_header bogus_header;
2411    struct op_list_search_key key;
2412    bmi_size_t copy_size = 0;
2413    bmi_size_t total_copied = 0;
2414    int i;
2415    PINT_event_id eid = 0;
2416
2417    PINT_EVENT_START(
2418        bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, &eid,
2419        PINT_HINT_GET_CLIENT_ID(hints),
2420        PINT_HINT_GET_REQUEST_ID(hints),
2421        PINT_HINT_GET_RANK(hints),
2422        PINT_HINT_GET_HANDLE(hints),
2423        PINT_HINT_GET_OP_ID(hints),
2424        expected_size);
2425
2426    tcp_addr_data = src->method_data;
2427
2428    /* short out immediately if the address is bad and we have no way to
2429     * reconnect
2430     */
2431    if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
2432    {
2433        gossip_debug(
2434            GOSSIP_BMI_DEBUG_TCP,
2435            "Warning: BMI communication attempted "
2436            "on an address in failure mode.\n");
2437        return(tcp_addr_data->addr_error);
2438    }
2439
2440    /* lets make sure that the message hasn't already been fully
2441     * buffered in eager mode before doing anything else
2442     */
2443    memset(&key, 0, sizeof(struct op_list_search_key));
2444    key.method_addr = src;
2445    key.method_addr_yes = 1;
2446    key.msg_tag = tag;
2447    key.msg_tag_yes = 1;
2448
2449    query_op =
2450        op_list_search(op_list_array[IND_RECV_EAGER_DONE_BUFFERING], &key);
2451    if (query_op)
2452    {
2453        /* make sure it isn't too big */
2454        if (query_op->actual_size > expected_size)
2455        {
2456            gossip_err("Error: message ordering violation;\n");
2457            gossip_err("Error: message too large for next buffer.\n");
2458            return (bmi_tcp_errno_to_pvfs(-EPROTO));
2459        }
2460
2461        /* whoohoo- it is already done! */
2462        /* copy buffer out to list segments; handle short case */
2463        for (i = 0; i < list_count; i++)
2464        {
2465            copy_size = size_list[i];
2466            if (copy_size + total_copied > query_op->actual_size)
2467            {
2468                copy_size = query_op->actual_size - total_copied;
2469            }
2470            memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
2471                                             total_copied), copy_size);
2472            total_copied += copy_size;
2473            if (total_copied == query_op->actual_size)
2474            {
2475                break;
2476            }
2477        }
2478        /* copy out to correct memory regions */
2479        (*actual_size) = query_op->actual_size;
2480        free(query_op->buffer);
2481        *id = 0;
2482        op_list_remove(query_op);
2483        dealloc_tcp_method_op(query_op);
2484        PINT_EVENT_END(bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid, 0,
2485                       *actual_size);
2486
2487        return (1);
2488    }
2489
2490    /* look for a message that is already being received */
2491    query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
2492    if (query_op)
2493    {
2494        tcp_op_data = query_op->method_data;
2495    }
2496
2497    /* see if it is being buffered into a temporary memory region */
2498    if (query_op && tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
2499    {
2500        /* make sure it isn't too big */
2501        if (query_op->actual_size > expected_size)
2502        {
2503            gossip_err("Error: message ordering violation;\n");
2504            gossip_err("Error: message too large for next buffer.\n");
2505            return (bmi_tcp_errno_to_pvfs(-EPROTO));
2506        }
2507
2508        /* copy what we have so far into the correct buffers */
2509        total_copied = 0;
2510        for (i = 0; i < list_count; i++)
2511        {
2512            copy_size = size_list[i];
2513            if (copy_size + total_copied > query_op->amt_complete)
2514            {
2515                copy_size = query_op->amt_complete - total_copied;
2516            }
2517            if (copy_size > 0)
2518            {
2519                memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
2520                                                 total_copied), copy_size);
2521            }
2522            total_copied += copy_size;
2523            if (total_copied == query_op->amt_complete)
2524            {
2525                query_op->list_index = i;
2526                query_op->cur_index_complete = copy_size;
2527                break;
2528            }
2529        }
2530
2531        /* see if we ended on a buffer boundary */
2532        if (query_op->cur_index_complete ==
2533            query_op->size_list[query_op->list_index])
2534        {
2535            query_op->list_index++;
2536            query_op->cur_index_complete = 0;
2537        }
2538
2539        /* release the old buffer */
2540        if (query_op->buffer)
2541        {
2542            free(query_op->buffer);
2543        }
2544
2545        *id = query_op->op_id;
2546        tcp_op_data = query_op->method_data;
2547        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
2548
2549        query_op->list_count = list_count;
2550        query_op->user_ptr = user_ptr;
2551        query_op->context_id = context_id;
2552        /* if there is only one item in the list, then keep the list stored
2553         * in the op structure.  This allows us to use the same code for send
2554         * and recv as we use for send_list and recv_list, without having to
2555         * malloc lists for those special cases
2556         */
2557        if (list_count == 1)
2558        {
2559            query_op->buffer_list = &tcp_op_data->buffer_list_stub;
2560            query_op->size_list = &tcp_op_data->size_list_stub;
2561            ((void **)query_op->buffer_list)[0] = buffer_list[0];
2562            ((bmi_size_t *)query_op->size_list)[0] = size_list[0];
2563        }
2564        else
2565        {
2566            query_op->buffer_list = buffer_list;
2567            query_op->size_list = size_list;
2568        }
2569
2570        if (query_op->amt_complete < query_op->actual_size)
2571        {
2572            /* try to recv some more data */
2573            tcp_addr_data = query_op->addr->method_data;
2574            ret = payload_progress(tcp_addr_data->socket,
2575                                   query_op->buffer_list,
2576                                   query_op->size_list,
2577                                   query_op->list_count,
2578                                   query_op->actual_size,
2579                                   &(query_op->list_index),
2580                                   &(query_op->cur_index_complete),
2581                                   BMI_RECV,
2582                                   NULL,
2583                                   0);
2584            if (ret < 0)
2585            {
2586                PVFS_perror_gossip("Error: payload_progress", ret);
2587                /* payload_progress() returns BMI error codes */
2588                tcp_forget_addr(query_op->addr, 0, ret);
2589                return (ret);
2590            }
2591
2592            query_op->amt_complete += ret;
2593        }
2594        assert(query_op->amt_complete <= query_op->actual_size);
2595        if (query_op->amt_complete == query_op->actual_size)
2596        {
2597            /* we are done */
2598            op_list_remove(query_op);
2599            *id = 0;
2600            (*actual_size) = query_op->actual_size;
2601            dealloc_tcp_method_op(query_op);
2602            PINT_EVENT_END(
2603                bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid,
2604                0, *actual_size);
2605
2606            return (1);
2607        }
2608        else
2609        {
2610            /* there is still more work to do */
2611            tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
2612            return (0);
2613        }
2614    }
2615
2616    /* NOTE: if the message was in flight, but not buffering, then
2617     * that means that it has already matched an earlier receive
2618     * post or else is an unexpected message that doesn't require a
2619     * matching receive post - at any rate it shouldn't be handled
2620     * here
2621     */
2622
2623    /* if we hit this point we must enqueue */
2624    if (expected_size <= TCP_MODE_EAGER_LIMIT)
2625    {
2626        bogus_header.mode = TCP_MODE_EAGER;
2627    }
2628    else
2629    {
2630        bogus_header.mode = TCP_MODE_REND;
2631    }
2632    bogus_header.tag = tag;
2633    ret = enqueue_operation(op_list_array[IND_RECV],
2634                            BMI_RECV, src, buffer_list, size_list,
2635                            list_count, 0, 0, id, BMI_TCP_INPROGRESS,
2636                            bogus_header, user_ptr, 0,
2637                            expected_size, context_id, eid);
2638    /* just for safety; this field isn't valid to the caller anymore */
2639    (*actual_size) = 0;
2640    /* TODO: figure out why this causes deadlocks; observable in 2
2641     * scenarios:
2642     * - pvfs2-client-core with threaded library and nptl
2643     * - pvfs2-server threaded with nptl sending messages to itself
2644     */
2645#if 0
2646    if (ret >= 0)
2647    {
2648        /* go ahead and try to do some work while we are in this
2649         * function since we appear to be backlogged.  Make sure that
2650         * we do not wait in the poll, however.
2651         */
2652        ret = tcp_do_work(0);
2653    }
2654#endif
2655    return (ret);
2656}
2657
2658
2659/* tcp_cleanse_addr()
2660 *
2661 * finds all active operations matching the given address, places them
2662 * in an error state, and moves them to the completed queue.
2663 *
2664 * NOTE: this function does not shut down the address.  That should be
2665 * handled separately
2666 *
2667 * returns 0 on success, -errno on failure
2668 */
2669static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code)
2670{
2671    int i = 0;
2672    struct op_list_search_key key;
2673    method_op_p query_op = NULL;
2674
2675    memset(&key, 0, sizeof(struct op_list_search_key));
2676    key.method_addr = map;
2677    key.method_addr_yes = 1;
2678
2679    /* NOTE: we know the unexpected completed queue is the last index! */
2680    for (i = 0; i < (NUM_INDICES - 1); i++)
2681    {
2682        if (op_list_array[i])
2683        {
2684            while ((query_op = op_list_search(op_list_array[i], &key)))
2685            {
2686                op_list_remove(query_op);
2687                query_op->error_code = error_code;
2688                if (query_op->mode == TCP_MODE_UNEXP && query_op->send_recv
2689                    == BMI_RECV)
2690                {
2691                    op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP],
2692                                query_op);
2693                }
2694                else
2695                {
2696                    ((struct tcp_op*)(query_op->method_data))->tcp_op_state =
2697                        BMI_TCP_COMPLETE;
2698                    op_list_add(completion_array[query_op->context_id], query_op);
2699                }
2700            }
2701        }
2702    }
2703
2704    return (0);
2705}
2706
2707
2708/* tcp_shutdown_addr()
2709 *
2710 * closes connections associated with a tcp method address
2711 *
2712 * returns 0 on success, -errno on failure
2713 */
2714static int tcp_shutdown_addr(bmi_method_addr_p map)
2715{
2716
2717    struct tcp_addr *tcp_addr_data = map->method_data;
2718    if (tcp_addr_data->socket > -1)
2719    {
2720        close(tcp_addr_data->socket);
2721    }
2722    tcp_addr_data->socket = -1;
2723    tcp_addr_data->not_connected = 1;
2724
2725    return (0);
2726}
2727
2728
2729/* tcp_do_work()
2730 *
2731 * this is the function that actually does communication work during
2732 * BMI_tcp_testXXX and BMI_tcp_waitXXX functions.  The amount of work
2733 * that it does is tunable.
2734 *
2735 * returns 0 on success, -errno on failure.
2736 */
2737static int tcp_do_work(int max_idle_time)
2738{
2739    int ret = -1;
2740    bmi_method_addr_p addr_array[TCP_WORK_METRIC];
2741    int status_array[TCP_WORK_METRIC];
2742    int socket_count = 0;
2743    int i = 0;
2744    int stall_flag = 0;
2745    int busy_flag = 1;
2746    struct timespec req;
2747    struct tcp_addr* tcp_addr_data = NULL;
2748    struct timespec wait_time;
2749    struct timeval start;
2750
2751    if(sc_test_busy)
2752    {
2753        /* another thread is already polling or working on sockets */
2754        if(max_idle_time == 0)
2755        {
2756            /* we don't want to spend time waiting on it; return
2757             * immediately.
2758             */
2759            return(0);
2760        }
2761
2762        /* Sleep until working thread thread signals that it has finished
2763         * its work and then return.  No need for this thread to poll;
2764         * the other thread may have already finished what we wanted.
2765         * This condition wait is used strictly as a best effort to
2766         * prevent busy spin.  We'll sort out the results later.
2767         */
2768        gettimeofday(&start, NULL);
2769        wait_time.tv_sec = start.tv_sec + max_idle_time / 1000;
2770        wait_time.tv_nsec = (start.tv_usec + ((max_idle_time % 1000)*1000))*1000;
2771        if (wait_time.tv_nsec > 1000000000)
2772        {
2773            wait_time.tv_nsec = wait_time.tv_nsec - 1000000000;
2774            wait_time.tv_sec++;
2775        }
2776        gen_cond_timedwait(&interface_cond, &interface_mutex, &wait_time);
2777        return(0);
2778    }
2779
2780    /* this thread has gained control of the polling.  */
2781    sc_test_busy = 1;
2782    gen_mutex_unlock(&interface_mutex);
2783
2784    /* our turn to look at the socket collection */
2785    ret = BMI_socket_collection_testglobal(tcp_socket_collection_p,
2786                                       TCP_WORK_METRIC, &socket_count,
2787                                       addr_array, status_array,
2788                                       max_idle_time);
2789
2790    gen_mutex_lock(&interface_mutex);
2791    sc_test_busy = 0;
2792
2793    if (ret < 0)
2794    {
2795        /* wake up anyone else who might have been waiting */
2796        gen_cond_broadcast(&interface_cond);
2797        PVFS_perror_gossip("Error: socket collection:", ret);
2798        /* BMI_socket_collection_testglobal() returns BMI error code */
2799        return (ret);
2800    }
2801
2802    if(socket_count == 0)
2803        busy_flag = 0;
2804
2805    /* do different kinds of work depending on results */
2806    for (i = 0; i < socket_count; i++)
2807    {
2808        tcp_addr_data = addr_array[i]->method_data;
2809        /* skip working on addresses in failure mode */
2810        if(tcp_addr_data->addr_error)
2811        {
2812            /* addr_error field is in BMI error code format */
2813            tcp_forget_addr(addr_array[i], 0, tcp_addr_data->addr_error);
2814            continue;
2815        }
2816
2817        if (status_array[i] & SC_ERROR_BIT)
2818        {
2819            ret = tcp_do_work_error(addr_array[i]);
2820            if (ret < 0)
2821            {
2822                PVFS_perror_gossip("Warning: BMI error handling failure, continuing", ret);
2823            }
2824        }
2825        else
2826        {
2827            if (status_array[i] & SC_WRITE_BIT)
2828            {
2829                ret = tcp_do_work_send(addr_array[i], &stall_flag);
2830                if (ret < 0)
2831                {
2832                    PVFS_perror_gossip("Warning: BMI send error, continuing", ret);
2833                }
2834                if(!stall_flag)
2835                    busy_flag = 0;
2836            }
2837            if (status_array[i] & SC_READ_BIT)
2838            {
2839                ret = tcp_do_work_recv(addr_array[i], &stall_flag);
2840                if (ret < 0)
2841                {
2842                    PVFS_perror_gossip("Warning: BMI recv error, continuing", ret);
2843                }
2844                if(!stall_flag)
2845                    busy_flag = 0;
2846            }
2847        }
2848    }
2849
2850    /* IMPORTANT NOTE: if we have set the following flag, then it indicates that
2851     * poll() is finding data on our sockets, yet we are not able to move
2852     * any of it right now.  This means that the sockets are backlogged, and
2853     * BMI is in danger of busy spinning during test functions.  Let's sleep
2854     * for a millisecond here in hopes of letting the rest of the system
2855     * catch up somehow (either by clearing a backlog in another I/O
2856     * component, or by posting more matching BMI recieve operations)
2857     */
2858    if(busy_flag)
2859    {
2860        req.tv_sec = 0;
2861        req.tv_nsec = 1000;
2862        gen_mutex_unlock(&interface_mutex);
2863        nanosleep(&req, NULL);
2864        gen_mutex_lock(&interface_mutex);
2865    }
2866
2867    /* wake up anyone else who might have been waiting */
2868    gen_cond_broadcast(&interface_cond);
2869    return (0);
2870}
2871
2872
2873/* tcp_do_work_send()
2874 *
2875 * does work on a TCP address that is ready to send data.
2876 *
2877 * returns 0 on success, -errno on failure
2878 */
2879static int tcp_do_work_send(bmi_method_addr_p map, int* stall_flag)
2880{
2881    method_op_p active_method_op = NULL;
2882    struct op_list_search_key key;
2883    int blocked_flag = 0;
2884    int ret = 0;
2885    int tmp_stall_flag;
2886
2887    *stall_flag = 1;
2888
2889    while (blocked_flag == 0 && ret == 0)
2890    {
2891        /* what we want to do here is find the first operation in the send
2892         * queue for this address.
2893         */
2894        memset(&key, 0, sizeof(struct op_list_search_key));
2895        key.method_addr = map;
2896        key.method_addr_yes = 1;
2897        active_method_op = op_list_search(op_list_array[IND_SEND], &key);
2898        if (!active_method_op)
2899        {
2900            /* ran out of queued sends to work on */
2901            return (0);
2902        }
2903
2904        ret = work_on_send_op(active_method_op, &blocked_flag, &tmp_stall_flag);
2905        if(!tmp_stall_flag)
2906            *stall_flag = 0;
2907    }
2908
2909    return (ret);
2910}
2911
2912
2913/* handle_new_connection()
2914 *
2915 * this function should be called only on special tcp method addresses
2916 * that represent local server ports.  It will attempt to accept a new
2917 * connection and create a new method address for the remote host.
2918 *
2919 * side effect: destroys the temporary method_address that is passed in
2920 * to it.
2921 *
2922 * returns 0 on success, -errno on failure
2923 */
2924static int handle_new_connection(bmi_method_addr_p map)
2925{
2926    struct tcp_addr *tcp_addr_data = NULL;
2927    int accepted_socket = -1;
2928    bmi_method_addr_p new_addr = NULL;
2929    int ret = -1;
2930    char* tmp_peer = NULL;
2931
2932    ret = tcp_accept_init(&accepted_socket, &tmp_peer);
2933    if (ret < 0)
2934    {
2935        return (ret);
2936    }
2937    if (accepted_socket < 0)
2938    {
2939        /* guess it wasn't ready after all */
2940        return (0);
2941    }
2942
2943    /* ok, we have a new socket.  what now?  Probably simplest
2944     * thing to do is to create a new method_addr, add it to the
2945     * socket collection, and return.  It will get caught the next
2946     * time around */
2947    new_addr = alloc_tcp_method_addr();
2948    if (!new_addr)
2949    {
2950        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
2951    }
2952    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
2953                  "Assigning socket %d to new method addr.\n",
2954                  accepted_socket);
2955    tcp_addr_data = new_addr->method_data;
2956    tcp_addr_data->socket = accepted_socket;
2957    tcp_addr_data->peer = tmp_peer;
2958    tcp_addr_data->peer_type = BMI_TCP_PEER_IP;
2959
2960    /* set a flag to make sure that we never try to reconnect this address
2961     * in the future
2962     */
2963    tcp_addr_data->dont_reconnect = 1;
2964    /* register this address with the method control layer */
2965    tcp_addr_data->bmi_addr = bmi_method_addr_reg_callback(new_addr);
2966    if (ret < 0)
2967    {
2968        tcp_shutdown_addr(new_addr);
2969        dealloc_tcp_method_addr(new_addr);
2970        dealloc_tcp_method_addr(map);
2971        return (ret);
2972    }
2973    BMI_socket_collection_add(tcp_socket_collection_p, new_addr);
2974
2975    dealloc_tcp_method_addr(map);
2976    return (0);
2977
2978}
2979
2980
2981/* tcp_do_work_recv()
2982 *
2983 * does work on a TCP address that is ready to recv data.
2984 *
2985 * returns 0 on success, -errno on failure
2986 */
2987static int tcp_do_work_recv(bmi_method_addr_p map, int* stall_flag)
2988{
2989
2990    method_op_p active_method_op = NULL;
2991    int ret = -1;
2992    void *new_buffer = NULL;
2993    struct op_list_search_key key;
2994    struct tcp_msg_header new_header;
2995    struct tcp_addr *tcp_addr_data = map->method_data;
2996    struct tcp_op *tcp_op_data = NULL;
2997    int tmp_errno;
2998    int tmp;
2999    bmi_size_t old_amt_complete = 0;
3000    time_t current_time;
3001
3002    *stall_flag = 1;
3003
3004    /* figure out if this is a new connection */
3005    if (tcp_addr_data->server_port)
3006    {
3007        /* just try to accept connection- no work yet */
3008        *stall_flag = 0;
3009        return (handle_new_connection(map));
3010    }
3011
3012    /* look for a recv for this address that is already in flight */
3013    active_method_op = find_recv_inflight(map);
3014    /* see if we found one in progress... */
3015    if (active_method_op)
3016    {
3017        tcp_op_data = active_method_op->method_data;
3018        if (active_method_op->mode == TCP_MODE_REND &&
3019            tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
3020        {
3021            /* we must wait for recv post */
3022            return (0);
3023        }
3024        else
3025        {
3026            old_amt_complete = active_method_op->amt_complete;
3027            ret = work_on_recv_op(active_method_op, stall_flag);
3028            gossip_debug(GOSSIP_BMI_DEBUG_TCP, "actual_size=%d, "
3029                         "amt_complete=%d, old_amt_complete=%d\n",
3030                         (int)active_method_op->actual_size,
3031                         (int)active_method_op->amt_complete,
3032                         (int)old_amt_complete);
3033
3034            if ((ret == 0) &&
3035                (old_amt_complete == active_method_op->amt_complete) &&
3036                active_method_op->actual_size &&
3037                (active_method_op->amt_complete <
3038                 active_method_op->actual_size))
3039            {
3040                gossip_debug(
3041                    GOSSIP_BMI_DEBUG_TCP, "Warning: bmi_tcp unable "
3042                    "to recv any data reported by poll(). [1]\n");
3043
3044                if (tcp_addr_data->zero_read_limit++ ==
3045                    BMI_TCP_ZERO_READ_LIMIT)
3046                {
3047                    gossip_debug(GOSSIP_BMI_DEBUG_TCP,
3048                                 "...dropping connection.\n");
3049                    tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3050                }
3051            }
3052            else
3053            {
3054                tcp_addr_data->zero_read_limit = 0;
3055            }
3056            return(ret);
3057        }
3058    }
3059
3060    /* let's see if a the entire header is ready to be received.  If so
3061     * we will go ahead and pull it.  Otherwise, we will try again later.
3062     * It isn't worth the complication of reading only a partial message
3063     * header - we really want it atomically
3064     */
3065    ret = BMI_sockio_nbpeek(tcp_addr_data->socket,
3066                            new_header.enc_hdr, TCP_ENC_HDR_SIZE);
3067    if (ret < 0)
3068    {
3069        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-errno));
3070        return (0);
3071    }
3072
3073    if (ret == 0)
3074    {
3075        gossip_debug(
3076            GOSSIP_BMI_DEBUG_TCP, "Warning: bmi_tcp unable "
3077            "to recv any data reported by poll(). [2]\n");
3078
3079        if (tcp_addr_data->zero_read_limit++ ==
3080            BMI_TCP_ZERO_READ_LIMIT)
3081        {
3082            gossip_debug(GOSSIP_BMI_DEBUG_TCP,
3083                         "...dropping connection.\n");
3084            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3085        }
3086        return(0);
3087    }
3088    else
3089    {
3090        tcp_addr_data->zero_read_limit = 0;
3091    }
3092
3093    if (ret < TCP_ENC_HDR_SIZE)
3094    {
3095        current_time = time(NULL);
3096        if(!tcp_addr_data->short_header_timer)
3097        {
3098            tcp_addr_data->short_header_timer = current_time;
3099        }
3100        else if((current_time - tcp_addr_data->short_header_timer) >
3101            BMI_TCP_HEADER_WAIT_SECONDS)
3102        {
3103            gossip_err("Error: incomplete BMI TCP header after %d seconds, closing connection.\n",
3104                BMI_TCP_HEADER_WAIT_SECONDS);
3105            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3106            return (0);
3107        }
3108
3109        /* header not ready yet, but we will keep hoping */
3110        return (0);
3111    }
3112
3113    tcp_addr_data->short_header_timer = 0;
3114    *stall_flag = 0;
3115    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Reading header for new op.\n");
3116    ret = BMI_sockio_nbrecv(tcp_addr_data->socket,
3117                           new_header.enc_hdr, TCP_ENC_HDR_SIZE);
3118    if (ret < TCP_ENC_HDR_SIZE)
3119    {
3120        tmp_errno = errno;
3121        gossip_err("Error: BMI_sockio_nbrecv: %s\n", strerror(tmp_errno));
3122        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-tmp_errno));
3123        return (0);
3124    }
3125
3126    /* decode the header */
3127    BMI_TCP_DEC_HDR(new_header);
3128
3129    /* so we have the header. now what?  These are the possible
3130     * scenarios:
3131     * a) unexpected message
3132     * b) eager message for which a recv has been posted
3133     * c) eager message for which a recv has not been posted
3134     * d) rendezvous messsage for which a recv has been posted
3135     * e) rendezvous messsage for which a recv has not been posted
3136     * f) eager message for which a rend. recv has been posted
3137     */
3138
3139    /* check magic number of message */
3140    if(new_header.magic_nr != BMI_MAGIC_NR)
3141    {
3142        gossip_err("Error: bad magic in BMI TCP message.\n");
3143        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EBADMSG));
3144        return(0);
3145    }
3146
3147    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Received new message; mode: %d.\n",
3148                  (int) new_header.mode);
3149    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "tag: %d\n", (int) new_header.tag);
3150
3151    if (new_header.mode == TCP_MODE_UNEXP)
3152    {
3153        /* allocate the operation structure */
3154        active_method_op = alloc_tcp_method_op();
3155        if (!active_method_op)
3156        {
3157            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3158            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3159        }
3160        /* create data buffer */
3161        new_buffer = malloc(new_header.size);
3162        if (!new_buffer)
3163        {
3164            dealloc_tcp_method_op(active_method_op);
3165            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3166            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3167        }
3168
3169        /* set the fields */
3170        active_method_op->send_recv = BMI_RECV;
3171        active_method_op->addr = map;
3172        active_method_op->actual_size = new_header.size;
3173        active_method_op->expected_size = 0;
3174        active_method_op->amt_complete = 0;
3175        active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3176        active_method_op->msg_tag = new_header.tag;
3177        active_method_op->buffer = new_buffer;
3178        active_method_op->mode = TCP_MODE_UNEXP;
3179        active_method_op->buffer_list = &(active_method_op->buffer);
3180        active_method_op->size_list = &(active_method_op->actual_size);
3181        active_method_op->list_count = 1;
3182        tcp_op_data = active_method_op->method_data;
3183        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3184        tcp_op_data->env = new_header;
3185
3186        op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3187        /* grab some data if we can */
3188        return (work_on_recv_op(active_method_op, &tmp));
3189    }
3190
3191    memset(&key, 0, sizeof(struct op_list_search_key));
3192    key.method_addr = map;
3193    key.method_addr_yes = 1;
3194    key.msg_tag = new_header.tag;
3195    key.msg_tag_yes = 1;
3196
3197    /* look for a match within the posted operations */
3198    active_method_op = op_list_search(op_list_array[IND_RECV], &key);
3199
3200    if (active_method_op)
3201    {
3202        /* make sure it isn't too big */
3203        if (new_header.size > active_method_op->expected_size)
3204        {
3205            gossip_err("Error: message ordering violation;\n");
3206            gossip_err("Error: message too large for next buffer.\n");
3207            gossip_err("Error: incoming size: %ld, expected size: %ld\n",
3208                        (long) new_header.size,
3209                        (long) active_method_op->expected_size);
3210            /* TODO: return error here or do something else? */
3211            return (bmi_tcp_errno_to_pvfs(-EPROTO));
3212        }
3213
3214        /* we found a match.  go work on it and return */
3215        op_list_remove(active_method_op);
3216        active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3217        active_method_op->actual_size = new_header.size;
3218        op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3219        return (work_on_recv_op(active_method_op, &tmp));
3220    }
3221
3222    /* no match anywhere.  Start a new operation */
3223    /* allocate the operation structure */
3224    active_method_op = alloc_tcp_method_op();
3225    if (!active_method_op)
3226    {
3227        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3228        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3229    }
3230
3231    if (new_header.mode == TCP_MODE_EAGER)
3232    {
3233        /* create data buffer for eager messages */
3234        new_buffer = malloc(new_header.size);
3235        if (!new_buffer)
3236        {
3237            dealloc_tcp_method_op(active_method_op);
3238            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3239            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3240        }
3241    }
3242    else
3243    {
3244        new_buffer = NULL;
3245    }
3246
3247    /* set the fields */
3248    active_method_op->send_recv = BMI_RECV;
3249    active_method_op->addr = map;
3250    active_method_op->actual_size = new_header.size;
3251    active_method_op->expected_size = 0;
3252    active_method_op->amt_complete = 0;
3253    active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3254    active_method_op->msg_tag = new_header.tag;
3255    active_method_op->buffer = new_buffer;
3256    active_method_op->mode = new_header.mode;
3257    active_method_op->buffer_list = &(active_method_op->buffer);
3258    active_method_op->size_list = &(active_method_op->actual_size);
3259    active_method_op->list_count = 1;
3260    tcp_op_data = active_method_op->method_data;
3261    tcp_op_data->tcp_op_state = BMI_TCP_BUFFERING;
3262    tcp_op_data->env = new_header;
3263
3264    op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3265
3266    /* grab some data if we can */
3267    if (new_header.mode == TCP_MODE_EAGER)
3268    {
3269        return (work_on_recv_op(active_method_op, &tmp));
3270    }
3271
3272    return (0);
3273}
3274
3275
3276/*
3277 * work_on_send_op()
3278 *
3279 * used to perform work on a send operation.  this is called by the poll
3280 * function.
3281 *
3282 * sets blocked_flag if no more work can be done on socket without
3283 * blocking
3284 * returns 0 on success, -errno on failure.
3285 */
3286static int work_on_send_op(method_op_p my_method_op,
3287                           int *blocked_flag, int* stall_flag)
3288{
3289    int ret = -1;
3290    struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
3291    struct tcp_op *tcp_op_data = my_method_op->method_data;
3292
3293    *blocked_flag = 1;
3294    *stall_flag = 0;
3295
3296    /* make sure that the connection is done before we continue */
3297    if (tcp_addr_data->not_connected)
3298    {
3299        ret = tcp_sock_init(my_method_op->addr);
3300        if (ret < 0)
3301        {
3302            PVFS_perror_gossip("Error: socket failed to init", ret);
3303            /* tcp_sock_init() returns BMI error code */
3304            tcp_forget_addr(my_method_op->addr, 0, ret);
3305            return (0);
3306        }
3307        if (tcp_addr_data->not_connected)
3308        {
3309            /* try again later- still could not connect */
3310            tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3311            return (0);
3312        }
3313    }
3314
3315    ret = payload_progress(tcp_addr_data->socket,
3316        my_method_op->buffer_list,
3317        my_method_op->size_list,
3318        my_method_op->list_count,
3319        my_method_op->actual_size,
3320        &(my_method_op->list_index),
3321        &(my_method_op->cur_index_complete),
3322        BMI_SEND,
3323        tcp_op_data->env.enc_hdr,
3324        &my_method_op->env_amt_complete);
3325    if (ret < 0)
3326    {
3327        PVFS_perror_gossip("Error: payload_progress", ret);
3328        /* payload_progress() returns BMI error codes */
3329        tcp_forget_addr(my_method_op->addr, 0, ret);
3330        return (0);
3331    }
3332
3333    if(ret == 0)
3334        *stall_flag = 1;
3335
3336    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
3337    my_method_op->amt_complete += ret;
3338    assert(my_method_op->amt_complete <= my_method_op->actual_size);
3339
3340    if (my_method_op->amt_complete == my_method_op->actual_size && my_method_op->env_amt_complete == TCP_ENC_HDR_SIZE)
3341    {
3342        /* we are done */
3343        my_method_op->error_code = 0;
3344        BMI_socket_collection_remove_write_bit(tcp_socket_collection_p,
3345                                           my_method_op->addr);
3346        op_list_remove(my_method_op);
3347        ((struct tcp_op*)(my_method_op->method_data))->tcp_op_state =
3348            BMI_TCP_COMPLETE;
3349        op_list_add(completion_array[my_method_op->context_id], my_method_op);
3350        *blocked_flag = 0;
3351    }
3352    else
3353    {
3354        /* there is still more work to do */
3355        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3356    }
3357
3358    return (0);
3359}
3360
3361
3362/*
3363 * work_on_recv_op()
3364 *
3365 * used to perform work on a recv operation.  this is called by the poll
3366 * function.
3367 * NOTE: this function assumes the method header has already been read.
3368 *
3369 * returns 0 on success, -errno on failure.
3370 */
3371static int work_on_recv_op(method_op_p my_method_op, int* stall_flag)
3372{
3373
3374    int ret = -1;
3375    struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
3376    struct tcp_op *tcp_op_data = my_method_op->method_data;
3377
3378    *stall_flag = 1;
3379
3380    if (my_method_op->actual_size != 0)
3381    {
3382        /* now let's try to recv some actual data */
3383        ret = payload_progress(tcp_addr_data->socket,
3384            my_method_op->buffer_list,
3385            my_method_op->size_list,
3386            my_method_op->list_count,
3387            my_method_op->actual_size,
3388            &(my_method_op->list_index),
3389            &(my_method_op->cur_index_complete),
3390            BMI_RECV,
3391            NULL,
3392            0);
3393        if (ret < 0)
3394        {
3395            PVFS_perror_gossip("Error: payload_progress", ret);
3396            /* payload_progress() returns BMI error codes */
3397            tcp_forget_addr(my_method_op->addr, 0, ret);
3398            return (0);
3399        }
3400    }
3401    else
3402    {
3403        ret = 0;
3404    }
3405
3406    if(ret > 0)
3407        *stall_flag = 0;
3408
3409    my_method_op->amt_complete += ret;
3410    assert(my_method_op->amt_complete <= my_method_op->actual_size);
3411
3412    if (my_method_op->amt_complete == my_method_op->actual_size)
3413    {
3414        /* we are done */
3415        op_list_remove(my_method_op);
3416        if (tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
3417        {
3418            /* queue up to wait on matching post recv */
3419            op_list_add(op_list_array[IND_RECV_EAGER_DONE_BUFFERING],
3420                        my_method_op);
3421        }
3422        else
3423        {
3424            my_method_op->error_code = 0;
3425            if (my_method_op->mode == TCP_MODE_UNEXP)
3426            {
3427                op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP],
3428                            my_method_op);
3429            }
3430            else
3431            {
3432                ((struct tcp_op*)(my_method_op->method_data))->tcp_op_state =
3433                    BMI_TCP_COMPLETE;
3434                op_list_add(completion_array[my_method_op->context_id], my_method_op);
3435            }
3436        }
3437    }
3438
3439    return (0);
3440}
3441
3442
3443/* tcp_do_work_error()
3444 *
3445 * handles a tcp address that has indicated an error during polling.
3446 *
3447 * returns 0 on success, -errno on failure
3448 */
3449static int tcp_do_work_error(bmi_method_addr_p map)
3450{
3451    struct tcp_addr *tcp_addr_data = NULL;
3452    int buf;
3453    int ret;
3454    int tmp_errno;
3455
3456    tcp_addr_data = map->method_data;
3457
3458    /* perform a read on the socket so that we can get a real errno */
3459    ret = read(tcp_addr_data->socket, &buf, sizeof(int));
3460    if (ret == 0)
3461        tmp_errno = EPIPE;  /* report other side closed socket with this */
3462    else
3463        tmp_errno = errno;
3464
3465    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Error: bmi_tcp: %s\n",
3466      strerror(tmp_errno));
3467
3468    if (tcp_addr_data->server_port)
3469    {
3470        /* Ignore this and hope it goes away... we don't want to lose
3471         * our local socket */
3472        dealloc_tcp_method_addr(map);
3473        gossip_lerr("Warning: error polling on server socket, continuing.\n");
3474        return (0);
3475    }
3476
3477    if(tmp_errno == 0)
3478        tmp_errno = EPROTO;
3479
3480    tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-tmp_errno));
3481
3482    return (0);
3483}
3484
3485#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
3486/*
3487 * tcp_enable_trusted()
3488 * Ideally, this function should look up the security configuration of
3489 * the server and determines
3490 * if it needs to bind to any specific port locally or not..
3491 * For now look at the FIXME below.
3492 */
3493static int tcp_enable_trusted(struct tcp_addr *tcp_addr_data)
3494{
3495    /*
3496     * FIXME:
3497     * For now, there is no way for us to check if a given
3498     * server is actually using port protection or not.
3499     * For now we unconditionally use a trusted port range
3500     * as long as USE_TRUSTED is #defined.
3501     *
3502     * Although most of the time we expect users
3503     * to be using a range of 0-1024, it is hard to keep probing
3504     * until one gets a port in the range specified.
3505     * Hence this is a temporary fix. we will see if this
3506     * requirement even needs to be met at all.
3507     */
3508    static unsigned short my_requested_port = 1023;
3509    unsigned short my_local_port = 0;
3510    struct sockaddr_in my_local_sockaddr;
3511    socklen_t len = sizeof(struct sockaddr_in);
3512    memset(&my_local_sockaddr, 0, sizeof(struct sockaddr_in));
3513
3514    /* setup for a fast restart to avoid bind addr in use errors */
3515    if (BMI_sockio_set_sockopt(tcp_addr_data->socket, SO_REUSEADDR, 1) < 0)
3516    {
3517        gossip_lerr("Could not set SO_REUSEADDR on local socket (port %hd)\n", my_local_port);
3518    }
3519    if (BMI_sockio_bind_sock(tcp_addr_data->socket, my_requested_port) < 0)
3520    {
3521        gossip_lerr("Could not bind to local port %hd: %s\n",
3522                my_requested_port, strerror(errno));
3523    }
3524    else {
3525        my_requested_port--;
3526    }
3527    my_local_sockaddr.sin_family = AF_INET;
3528    if (getsockname(tcp_addr_data->socket,
3529                (struct sockaddr *)&my_local_sockaddr, &len) == 0)
3530    {
3531        my_local_port = ntohs(my_local_sockaddr.sin_port);
3532    }
3533    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Bound locally to port: %hd\n", my_local_port);
3534    return 0;
3535}
3536
3537#endif
3538
3539#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
3540
3541static char *bad_errors[] = {
3542    "invalid network address",
3543    "invalid port",
3544    "invalid network address and port"
3545};
3546
3547/*
3548 * tcp_allow_trusted()
3549 * if trusted ports was enabled make sure
3550 * that we can accept a particular connection from a given
3551 * client
3552 */
3553static int tcp_allow_trusted(struct sockaddr_in *peer_sockaddr)
3554{
3555    char *peer_hostname = inet_ntoa(peer_sockaddr->sin_addr);
3556    unsigned short peer_port = ntohs(peer_sockaddr->sin_port);
3557    int   i, what_failed   = -1;
3558
3559    /* Don't refuse connects if there were any
3560     * parse errors or if it is not enabled in the config file
3561     */
3562    if (gtcp_allowed_connection->port_enforce == 0
3563            && gtcp_allowed_connection->network_enforce == 0)
3564    {
3565        return 0;
3566    }
3567    /* make sure that the client is within the allowed network */
3568    if (gtcp_allowed_connection->network_enforce == 1)
3569    {
3570        /* Always allow localhost to connect */
3571        if (ntohl(peer_sockaddr->sin_addr.s_addr) == INADDR_LOOPBACK)
3572        {
3573            goto port_check;
3574        }
3575        for (i = 0; i < gtcp_allowed_connection->network_count; i++)
3576        {
3577            /* check with all the masks */
3578            if ((peer_sockaddr->sin_addr.s_addr & gtcp_allowed_connection->netmask[i].s_addr)
3579                    != (gtcp_allowed_connection->network[i].s_addr & gtcp_allowed_connection->netmask[i].s_addr ))
3580            {
3581                continue;
3582            }
3583            else {
3584                goto port_check;
3585            }
3586        }
3587        /* not from a trusted network */
3588        what_failed = 0;
3589    }
3590port_check:
3591    /* make sure that the client port numbers are within specified limits */
3592    if (gtcp_allowed_connection->port_enforce == 1)
3593    {
3594        if (peer_port < gtcp_allowed_connection->ports[0]
3595                || peer_port > gtcp_allowed_connection->ports[1])
3596        {
3597            what_failed = (what_failed < 0) ? 1 : 2;
3598        }
3599    }
3600    /* okay, we are good to go */
3601    if (what_failed < 0)
3602    {
3603        return 0;
3604    }
3605    /* no good */
3606    gossip_err("Rejecting client %s on port %d: %s\n",
3607           peer_hostname, peer_port, bad_errors[what_failed]);
3608    return -1;
3609}
3610
3611#endif
3612
3613/*
3614 * tcp_accept_init()
3615 *
3616 * used to establish a connection from the server side.  Attempts an
3617 * accept call and provides the socket if it succeeds.
3618 *
3619 * returns 0 on success, -errno on failure.
3620 */
3621static int tcp_accept_init(int *socket, char** peer)
3622{
3623
3624    int ret = -1;
3625    int tmp_errno = 0;
3626    struct tcp_addr *tcp_addr_data = tcp_method_params.listen_addr->method_data;
3627    int oldfl = 0;
3628    struct sockaddr_in peer_sockaddr;
3629    int peer_sockaddr_size = sizeof(struct sockaddr_in);
3630    char* tmp_peer;
3631
3632    /* do we have a socket on this end yet? */
3633    if (tcp_addr_data->socket < 0)
3634    {
3635        ret = tcp_server_init();
3636        if (ret < 0)
3637        {
3638            return (ret);
3639        }
3640    }
3641
3642    *socket = accept(tcp_addr_data->socket, (struct sockaddr*)&peer_sockaddr,
3643              (socklen_t *)&peer_sockaddr_size);
3644
3645    if (*socket < 0)
3646    {
3647        if ((errno == EAGAIN) ||
3648            (errno == EWOULDBLOCK) ||
3649            (errno == ENETDOWN) ||
3650            (errno == EPROTO) ||
3651            (errno == ENOPROTOOPT) ||
3652            (errno == EHOSTDOWN) ||
3653            (errno == ENONET) ||
3654            (errno == EHOSTUNREACH) ||
3655            (errno == EOPNOTSUPP) ||
3656            (errno == ENETUNREACH) ||
3657            (errno == ENFILE) ||
3658            (errno == EMFILE))
3659        {
3660            /* try again later */
3661            if ((errno == ENFILE) || (errno == EMFILE))
3662            {
3663                gossip_err("Error: accept: %s (continuing)\n",strerror(errno));
3664                bmi_method_addr_drop_callback(BMI_tcp_method_name);
3665            }
3666            return (0);
3667        }
3668        else
3669        {
3670            gossip_err("Error: accept: %s\n", strerror(errno));
3671            return (bmi_tcp_errno_to_pvfs(-errno));
3672        }
3673    }
3674
3675#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
3676
3677    /* make sure that we are allowed to accept this connection */
3678    if (tcp_allow_trusted(&peer_sockaddr) < 0)
3679    {
3680        /* Force closure of the connection */
3681        close(*socket);
3682        return (bmi_tcp_errno_to_pvfs(-EACCES));
3683    }
3684
3685#endif
3686
3687    /* we accepted a new connection.  turn off Nagle's algorithm. */
3688    if (BMI_sockio_set_tcpopt(*socket, TCP_NODELAY, 1) < 0)
3689    {
3690        tmp_errno = errno;
3691        gossip_lerr("Error: failed to set TCP_NODELAY option.\n");
3692        close(*socket);
3693        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
3694    }
3695
3696    /* set it to non-blocking operation */
3697    oldfl = fcntl(*socket, F_GETFL, 0);
3698    if (!(oldfl & O_NONBLOCK))
3699    {
3700        fcntl(*socket, F_SETFL, oldfl | O_NONBLOCK);
3701    }
3702
3703    /* allocate ip address string */
3704    tmp_peer = inet_ntoa(peer_sockaddr.sin_addr);
3705    *peer = (char*)malloc(strlen(tmp_peer)+1);
3706    if(!(*peer))
3707    {
3708        close(*socket);
3709        return(bmi_tcp_errno_to_pvfs(-BMI_ENOMEM));
3710    }
3711    strcpy(*peer, tmp_peer);
3712
3713    return (0);
3714}
3715
3716
3717/* alloc_tcp_method_op()
3718 *
3719 * creates a new method op with defaults filled in for tcp.
3720 *
3721 * returns pointer to structure on success, NULL on failure
3722 */
3723static method_op_p alloc_tcp_method_op(void)
3724{
3725    method_op_p my_method_op = NULL;
3726
3727    my_method_op = bmi_alloc_method_op(sizeof(struct tcp_op));
3728
3729    /* we trust alloc_method_op to zero it out */
3730
3731    return (my_method_op);
3732}
3733
3734
3735/* dealloc_tcp_method_op()
3736 *
3737 * destroys an existing tcp method op, freeing segment lists if
3738 * needed
3739 *
3740 * no return value
3741 */
3742static void dealloc_tcp_method_op(method_op_p old_op)
3743{
3744    bmi_dealloc_method_op(old_op);
3745    return;
3746}
3747
3748/* tcp_post_send_generic()
3749 *
3750 * Submits send operations (low level).
3751 *
3752 * returns 0 on success that requires later poll, returns 1 on instant
3753 * completion, -errno on failure
3754 */
3755static int tcp_post_send_generic(bmi_op_id_t * id,
3756                                 bmi_method_addr_p dest,
3757                                 const void *const *buffer_list,
3758                                 const bmi_size_t *size_list,
3759                                 int list_count,
3760                                 enum bmi_buffer_type buffer_type,
3761                                 struct tcp_msg_header my_header,
3762                                 void *user_ptr,
3763                                 bmi_context_id context_id,
3764                                 PVFS_hint hints)
3765{
3766    struct tcp_addr *tcp_addr_data = dest->method_data;
3767    method_op_p query_op = NULL;
3768    int ret = -1;
3769    bmi_size_t total_size = 0;
3770    bmi_size_t amt_complete = 0;
3771    bmi_size_t env_amt_complete = 0;
3772    struct op_list_search_key key;
3773    int list_index = 0;
3774    bmi_size_t cur_index_complete = 0;
3775    PINT_event_id eid = 0;
3776
3777    if(PINT_EVENT_ENABLED)
3778    {
3779        int i = 0;
3780        for(; i < list_count; ++i)
3781        {
3782            total_size += size_list[i];
3783        }
3784    }
3785
3786    PINT_EVENT_START(
3787        bmi_tcp_send_event_id, bmi_tcp_pid, NULL, &eid,
3788        PINT_HINT_GET_CLIENT_ID(hints),
3789        PINT_HINT_GET_REQUEST_ID(hints),
3790        PINT_HINT_GET_RANK(hints),
3791        PINT_HINT_GET_HANDLE(hints),
3792        PINT_HINT_GET_OP_ID(hints),
3793        total_size);
3794
3795    /* Three things can happen here:
3796     * a) another op is already in queue for the address, so we just
3797     * queue up
3798     * b) we can send the whole message and return
3799     * c) we send part of the message and queue the rest
3800     */
3801
3802    /* NOTE: on the post_send side of an operation, it doesn't really
3803     * matter whether the op is going to be eager or rendezvous.  It is
3804     * handled the same way (except for how the header is filled in).
3805     * The difference is in the recv processing for TCP.
3806     */
3807
3808    /* NOTE: we also don't care what the buffer_type says, TCP could care
3809     * less what buffers it is using.
3810     */
3811
3812    /* encode the message header */
3813    BMI_TCP_ENC_HDR(my_header);
3814
3815    /* the first thing we must do is find out if another send is queued
3816     * up for this address so that we don't mess up our ordering.    */
3817    memset(&key, 0, sizeof(struct op_list_search_key));
3818    key.method_addr = dest;
3819    key.method_addr_yes = 1;
3820    query_op = op_list_search(op_list_array[IND_SEND], &key);
3821    if (query_op)
3822    {
3823        /* queue up operation */
3824        ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3825                                dest, (void **) buffer_list,
3826                                size_list, list_count, 0, 0,
3827                                id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3828                                my_header.size, 0,
3829                                context_id,
3830                                eid);
3831
3832        /* TODO: is this causing deadlocks?  See similar call in recv
3833         * path for another example.  This particular one seems to be an
3834         * issue under a heavy bonnie++ load that Neill has been
3835         * debugging.  Comment out for now to see if the problem goes
3836         * away.
3837         */
3838#if 0
3839        if (ret >= 0)
3840        {
3841            /* go ahead and try to do some work while we are in this
3842             * function since we appear to be backlogged.  Make sure that
3843             * we do not wait in the poll, however.
3844             */
3845            ret = tcp_do_work(0);
3846        }
3847#endif
3848        if (ret < 0)
3849        {
3850            gossip_err("Error: enqueue_operation() or tcp_do_work() returned: %d\n", ret);
3851        }
3852        return (ret);
3853    }
3854
3855    /* make sure the connection is established */
3856    ret = tcp_sock_init(dest);
3857    if (ret < 0)
3858    {
3859        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "tcp_sock_init() failure.\n");
3860        /* tcp_sock_init() returns BMI error code */
3861        tcp_forget_addr(dest, 0, ret);
3862        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, 0, ret);
3863        return (ret);
3864    }
3865
3866    tcp_addr_data = dest->method_data;
3867
3868#if 0
3869    /* TODO: this is a hack for testing! */
3870    /* disables immediate send completion... */
3871    ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3872                            dest, buffer_list, size_list, list_count, 0, 0,
3873                            id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3874                            my_header.size, 0,
3875                            context_id);
3876    return(ret);
3877#endif
3878
3879    if (tcp_addr_data->not_connected)
3880    {
3881        /* if the connection is not completed, queue up for later work */
3882        ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3883                                dest, (void **) buffer_list, size_list,
3884                                list_count, 0, 0,
3885                                id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3886                                my_header.size, 0,
3887                                context_id,
3888                                eid);
3889        if(ret < 0)
3890        {
3891            gossip_err("Error: enqueue_operation() returned: %d\n", ret);
3892        }
3893        return (ret);
3894    }
3895
3896    /* try to send some data */
3897    env_amt_complete = 0;
3898    ret = payload_progress(tcp_addr_data->socket,
3899        (void **) buffer_list,
3900        size_list, list_count, my_header.size, &list_index,
3901        &cur_index_complete, BMI_SEND, my_header.enc_hdr, &env_amt_complete);
3902    if (ret < 0)
3903    {
3904        PVFS_perror_gossip("Error: payload_progress", ret);
3905        /* payload_progress() returns BMI error codes */
3906        tcp_forget_addr(dest, 0, ret);
3907        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, eid, 0, ret);
3908        return (ret);
3909    }
3910
3911    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
3912    amt_complete = ret;
3913    assert(amt_complete <= my_header.size);
3914    if (amt_complete == my_header.size && env_amt_complete == TCP_ENC_HDR_SIZE)
3915    {
3916        /* we are already done */
3917        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid,
3918                       NULL, eid, 0, amt_complete);
3919        return (1);
3920    }
3921
3922    /* queue up the remainder */
3923    ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3924                            dest, (void **) buffer_list,
3925                            size_list, list_count,
3926                            amt_complete, env_amt_complete, id,
3927                            BMI_TCP_INPROGRESS, my_header, user_ptr,
3928                            my_header.size, 0, context_id, eid);
3929
3930    if(ret < 0)
3931    {
3932        gossip_err("Error: enqueue_operation() returned: %d\n", ret);
3933    }
3934    return (ret);
3935}
3936
3937
3938/* payload_progress()
3939 *
3940 * makes progress on sending/recving data payload portion of a message
3941 *
3942 * returns amount completed on success, -errno on failure
3943 */
3944static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
3945    size_list, int list_count, bmi_size_t total_size, int* list_index,
3946    bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
3947    char* enc_hdr, bmi_size_t* env_amt_complete)
3948{
3949    int i;
3950    int count = 0;
3951    int ret;
3952    int completed;
3953    /* used for finding the stopping point on short receives */
3954    int final_index = list_count-1;
3955    bmi_size_t final_size = size_list[list_count-1];
3956    bmi_size_t sum = 0;
3957    int vector_index = 0;
3958    int header_flag = 0;
3959    int tmp_env_done = 0;
3960
3961    if(send_recv == BMI_RECV)
3962    {
3963        /* find out if we should stop short in list processing */
3964        for(i=0; i<list_count; i++)
3965        {
3966            sum += size_list[i];
3967            if(sum >= total_size)
3968            {
3969                final_index = i;
3970                final_size = size_list[i] - (sum-total_size);
3971                break;
3972            }
3973        }
3974    }
3975
3976    assert(list_count > *list_index);
3977
3978    /* make sure we don't overrun our preallocated iovec array */
3979    if((list_count - (*list_index)) > BMI_TCP_IOV_COUNT)
3980    {
3981        list_count = (*list_index) + BMI_TCP_IOV_COUNT;
3982    }
3983
3984    /* do we need to send any of the header? */
3985    if(send_recv == BMI_SEND && *env_amt_complete < TCP_ENC_HDR_SIZE)
3986    {
3987        stat_io_vector[vector_index].iov_base = &enc_hdr[*env_amt_complete];
3988        stat_io_vector[vector_index].iov_len = TCP_ENC_HDR_SIZE - *env_amt_complete;
3989        count++;
3990        vector_index++;
3991        header_flag = 1;
3992    }
3993
3994    /* setup vector */
3995    stat_io_vector[vector_index].iov_base =
3996        (char*)buffer_list[*list_index] + *current_index_complete;
3997    count++;
3998    if(final_index == 0)
3999    {
4000        stat_io_vector[vector_index].iov_len = final_size - *current_index_complete;
4001    }
4002    else
4003    {
4004        stat_io_vector[vector_index].iov_len =
4005            size_list[*list_index] - *current_index_complete;
4006        for(i = (*list_index + 1); i < list_count; i++)
4007        {
4008            vector_index++;
4009            count++;
4010            stat_io_vector[vector_index].iov_base = buffer_list[i];
4011            if(i == final_index)
4012            {
4013                stat_io_vector[vector_index].iov_len = final_size;
4014                break;
4015            }
4016            else
4017            {
4018                stat_io_vector[vector_index].iov_len = size_list[i];
4019            }
4020        }
4021    }
4022
4023    assert(count > 0);
4024
4025    if(send_recv == BMI_RECV)
4026    {
4027        ret = BMI_sockio_nbvector(s, stat_io_vector, count, 1);
4028    }
4029    else
4030    {
4031        ret = BMI_sockio_nbvector(s, stat_io_vector, count, 0);
4032    }
4033
4034    /* if error or nothing done, return now */
4035    if(ret == 0)
4036        return(0);
4037    if(ret <= 0)
4038        return(bmi_tcp_errno_to_pvfs(-errno));
4039
4040    completed = ret;
4041    if(header_flag && (completed >= 0))
4042    {
4043        /* take care of completed header status */
4044        tmp_env_done = TCP_ENC_HDR_SIZE - *env_amt_complete;
4045        if(tmp_env_done > completed)
4046            tmp_env_done = completed;
4047        completed -= tmp_env_done;
4048        ret -= tmp_env_done;
4049        (*env_amt_complete) += tmp_env_done;
4050    }
4051
4052    i=header_flag;
4053    while(completed > 0)
4054    {
4055        /* take care of completed data payload */
4056        if(completed >= stat_io_vector[i].iov_len)
4057        {
4058            completed -= stat_io_vector[i].iov_len;
4059            *current_index_complete = 0;
4060            (*list_index)++;
4061            i++;
4062        }
4063        else
4064        {
4065            *current_index_complete += completed;
4066            completed = 0;
4067        }
4068    }
4069
4070    return(ret);
4071}
4072
4073static void bmi_set_sock_buffers(int socket){
4074        //Set socket buffer sizes:
4075        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Default socket buffers send:%d receive:%d\n",
4076                GET_SENDBUFSIZE(socket), GET_RECVBUFSIZE(socket));
4077        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Setting socket buffer size for send:%d receive:%d \n",
4078                tcp_buffer_size_send, tcp_buffer_size_receive);
4079    if( tcp_buffer_size_receive != 0)
4080         SET_RECVBUFSIZE(socket,tcp_buffer_size_receive);
4081    if( tcp_buffer_size_send != 0)
4082         SET_SENDBUFSIZE(socket,tcp_buffer_size_send);
4083        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Reread socket buffers send:%d receive:%d\n",
4084                GET_SENDBUFSIZE(socket), GET_RECVBUFSIZE(socket));
4085}
4086
4087/*
4088 * Local variables:
4089 *  c-indent-level: 4
4090 *  c-basic-offset: 4
4091 * End:
4092 *
4093 * vim: ts=8 sts=4 sw=4 expandtab
4094 */
Note: See TracBrowser for help on using the browser.