root/trunk/src/io/bmi/bmi_tcp/bmi-tcp.c @ 7819

Revision 7819, 116.3 KB (checked in by harms, 4 years ago)

Changed bmi-tcp so that an accept failure due to no resources available, that no error is propogarted so the BMI thread doesn't terminate. The accept failure for the no resource condition will also signal BMI to drop all addresses that have a resource count of zero, which may free resources to let future accepts to complete.

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/* TCP/IP implementation of a BMI method */
8
9#include <errno.h>
10#include <string.h>
11#include <unistd.h>
12#include <fcntl.h>
13#include <sys/poll.h>
14#include <netinet/tcp.h>
15#include <assert.h>
16#include <sys/uio.h>
17#include <time.h>
18#include <sys/time.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <arpa/inet.h>
22#include "pint-mem.h"
23
24#include "pvfs2-config.h"
25#ifdef HAVE_NETDB_H
26#include <netdb.h>
27#endif
28
29#include "bmi-method-support.h"
30#include "bmi-method-callback.h"
31#include "bmi-tcp-addressing.h"
32#ifdef __PVFS2_USE_EPOLL__
33#include "socket-collection-epoll.h"
34#else
35#include "socket-collection.h"
36#endif
37#include "op-list.h"
38#include "gossip.h"
39#include "sockio.h"
40#include "bmi-byteswap.h"
41#include "id-generator.h"
42#include "pint-event.h"
43#include "pvfs2-debug.h"
44#ifdef USE_TRUSTED
45#include "server-config.h"
46#include "bmi-tcp-addressing.h"
47#endif
48#include "gen-locks.h"
49#include "pint-hint.h"
50#include "pint-event.h"
51
52static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
53static gen_cond_t interface_cond = GEN_COND_INITIALIZER;
54static int sc_test_busy = 0;
55
56/* function prototypes */
57int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
58                       int method_id,
59                       int init_flags);
60int BMI_tcp_finalize(void);
61int BMI_tcp_set_info(int option,
62                     void *inout_parameter);
63int BMI_tcp_get_info(int option,
64                     void *inout_parameter);
65void *BMI_tcp_memalloc(bmi_size_t size,
66                       enum bmi_op_type send_recv);
67int BMI_tcp_memfree(void *buffer,
68                    bmi_size_t size,
69                    enum bmi_op_type send_recv);
70int BMI_tcp_unexpected_free(void *buffer);
71int BMI_tcp_post_send(bmi_op_id_t * id,
72                      bmi_method_addr_p dest,
73                      const void *buffer,
74                      bmi_size_t size,
75                      enum bmi_buffer_type buffer_type,
76                      bmi_msg_tag_t tag,
77                      void *user_ptr,
78                      bmi_context_id context_id,
79                      PVFS_hint hints);
80int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
81                                bmi_method_addr_p dest,
82                                const void *buffer,
83                                bmi_size_t size,
84                                enum bmi_buffer_type buffer_type,
85                                bmi_msg_tag_t tag,
86                                void *user_ptr,
87                                bmi_context_id context_id,
88                                PVFS_hint hints);
89int BMI_tcp_post_recv(bmi_op_id_t * id,
90                      bmi_method_addr_p src,
91                      void *buffer,
92                      bmi_size_t expected_size,
93                      bmi_size_t * actual_size,
94                      enum bmi_buffer_type buffer_type,
95                      bmi_msg_tag_t tag,
96                      void *user_ptr,
97                      bmi_context_id context_id,
98                      PVFS_hint hints);
99int BMI_tcp_test(bmi_op_id_t id,
100                 int *outcount,
101                 bmi_error_code_t * error_code,
102                 bmi_size_t * actual_size,
103                 void **user_ptr,
104                 int max_idle_time_ms,
105                 bmi_context_id context_id);
106int BMI_tcp_testsome(int incount,
107                     bmi_op_id_t * id_array,
108                     int *outcount,
109                     int *index_array,
110                     bmi_error_code_t * error_code_array,
111                     bmi_size_t * actual_size_array,
112                     void **user_ptr_array,
113                     int max_idle_time_ms,
114                     bmi_context_id context_id);
115int BMI_tcp_testunexpected(int incount,
116                           int *outcount,
117                           struct bmi_method_unexpected_info *info,
118                           int max_idle_time_ms);
119int BMI_tcp_testcontext(int incount,
120                     bmi_op_id_t * out_id_array,
121                     int *outcount,
122                     bmi_error_code_t * error_code_array,
123                     bmi_size_t * actual_size_array,
124                     void **user_ptr_array,
125                     int max_idle_time_ms,
126                     bmi_context_id context_id);
127bmi_method_addr_p BMI_tcp_method_addr_lookup(const char *id_string);
128const char* BMI_tcp_addr_rev_lookup_unexpected(bmi_method_addr_p map);
129int BMI_tcp_query_addr_range(bmi_method_addr_p, const char *, int);
130int BMI_tcp_post_send_list(bmi_op_id_t * id,
131                           bmi_method_addr_p dest,
132                           const void *const *buffer_list,
133                           const bmi_size_t *size_list,
134                           int list_count,
135                           bmi_size_t total_size,
136                           enum bmi_buffer_type buffer_type,
137                           bmi_msg_tag_t tag,
138                           void *user_ptr,
139                           bmi_context_id context_id,
140                           PVFS_hint hints);
141int BMI_tcp_post_recv_list(bmi_op_id_t * id,
142                           bmi_method_addr_p src,
143                           void *const *buffer_list,
144                           const bmi_size_t *size_list,
145                           int list_count,
146                           bmi_size_t total_expected_size,
147                           bmi_size_t * total_actual_size,
148                           enum bmi_buffer_type buffer_type,
149                           bmi_msg_tag_t tag,
150                           void *user_ptr,
151                           bmi_context_id context_id,
152                           PVFS_hint hints);
153int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
154                                     bmi_method_addr_p dest,
155                                     const void *const *buffer_list,
156                                     const bmi_size_t *size_list,
157                                     int list_count,
158                                     bmi_size_t total_size,
159                                     enum bmi_buffer_type buffer_type,
160                                     bmi_msg_tag_t tag,
161                                     void *user_ptr,
162                                     bmi_context_id context_id,
163                                     PVFS_hint hints);
164int BMI_tcp_open_context(bmi_context_id context_id);
165void BMI_tcp_close_context(bmi_context_id context_id);
166int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id);
167
168char BMI_tcp_method_name[] = "bmi_tcp";
169
170/* size of encoded message header */
171#define TCP_ENC_HDR_SIZE 24
172
173/* structure internal to tcp for use as a message header */
174struct tcp_msg_header
175{
176    uint32_t magic_nr;          /* magic number */
177    uint32_t mode;              /* eager, rendezvous, etc. */
178    bmi_msg_tag_t tag;          /* user specified message tag */
179    bmi_size_t size;            /* length of trailing message */
180    char enc_hdr[TCP_ENC_HDR_SIZE];  /* encoded version of header info */
181};
182
183#define BMI_TCP_ENC_HDR(hdr)                                            \
184    do {                                                                \
185        *((uint32_t*)&((hdr).enc_hdr[0])) = htobmi32((hdr).magic_nr);   \
186        *((uint32_t*)&((hdr).enc_hdr[4])) = htobmi32((hdr).mode);       \
187        *((uint64_t*)&((hdr).enc_hdr[8])) = htobmi64((hdr).tag);        \
188        *((uint64_t*)&((hdr).enc_hdr[16])) = htobmi64((hdr).size);      \
189    } while(0)                                             
190
191#define BMI_TCP_DEC_HDR(hdr)                                            \
192    do {                                                                \
193        (hdr).magic_nr = bmitoh32(*((uint32_t*)&((hdr).enc_hdr[0])));   \
194        (hdr).mode = bmitoh32(*((uint32_t*)&((hdr).enc_hdr[4])));       \
195        (hdr).tag = bmitoh64(*((uint64_t*)&((hdr).enc_hdr[8])));        \
196        (hdr).size = bmitoh64(*((uint64_t*)&((hdr).enc_hdr[16])));      \
197    } while(0)                                             
198
199/* enumerate states that we care about */
200enum bmi_tcp_state
201{
202    BMI_TCP_INPROGRESS,
203    BMI_TCP_BUFFERING,
204    BMI_TCP_COMPLETE
205};
206
207/* tcp private portion of operation structure */
208struct tcp_op
209{
210    struct tcp_msg_header env;  /* envelope for this message */
211    enum bmi_tcp_state tcp_op_state;
212    /* these two fields are used as place holders for the buffer
213     * list and size list when we really don't have lists (regular
214     * BMI_send or BMI_recv operations); it allows us to use
215     * generic code to handle both cases
216     */
217    void *buffer_list_stub;
218    bmi_size_t size_list_stub;
219};
220
221/* static io vector for use with readv and writev; we can only use
222 * this because BMI serializes module calls
223 */
224#define BMI_TCP_IOV_COUNT 10
225static struct iovec stat_io_vector[BMI_TCP_IOV_COUNT+1];
226
227/* internal utility functions */
228static int tcp_server_init(void);
229static void dealloc_tcp_method_addr(bmi_method_addr_p map);
230static int tcp_sock_init(bmi_method_addr_p my_method_addr);
231static int enqueue_operation(op_list_p target_list,
232                             enum bmi_op_type send_recv,
233                             bmi_method_addr_p map,
234                             void *const *buffer_list,
235                             const bmi_size_t *size_list,
236                             int list_count,
237                             bmi_size_t amt_complete,
238                             bmi_size_t env_amt_complete,
239                             bmi_op_id_t * id,
240                             int tcp_op_state,
241                             struct tcp_msg_header header,
242                             void *user_ptr,
243                             bmi_size_t actual_size,
244                             bmi_size_t expected_size,
245                             bmi_context_id context_id,
246                             int32_t event_id);
247static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code);
248static int tcp_shutdown_addr(bmi_method_addr_p map);
249static int tcp_do_work(int max_idle_time);
250static int tcp_do_work_error(bmi_method_addr_p map);
251static int tcp_do_work_recv(bmi_method_addr_p map, int* stall_flag);
252static int tcp_do_work_send(bmi_method_addr_p map, int* stall_flag);
253static int work_on_recv_op(method_op_p my_method_op,
254                           int *stall_flag);
255static int work_on_send_op(method_op_p my_method_op,
256                           int *blocked_flag, int* stall_flag);
257static int tcp_accept_init(int *socket, char** peer);
258static method_op_p alloc_tcp_method_op(void);
259static void dealloc_tcp_method_op(method_op_p old_op);
260static int handle_new_connection(bmi_method_addr_p map);
261static int tcp_post_send_generic(bmi_op_id_t * id,
262                                 bmi_method_addr_p dest,
263                                 const void *const *buffer_list,
264                                 const bmi_size_t *size_list,
265                                 int list_count,
266                                 enum bmi_buffer_type buffer_type,
267                                 struct tcp_msg_header my_header,
268                                 void *user_ptr,
269                                 bmi_context_id context_id,
270                                 PVFS_hint hints);
271static int tcp_post_recv_generic(bmi_op_id_t * id,
272                                 bmi_method_addr_p src,
273                                 void *const *buffer_list,
274                                 const bmi_size_t *size_list,
275                                 int list_count,
276                                 bmi_size_t expected_size,
277                                 bmi_size_t * actual_size,
278                                 enum bmi_buffer_type buffer_type,
279                                 bmi_msg_tag_t tag,
280                                 void *user_ptr,
281                                 bmi_context_id context_id,
282                                 PVFS_hint hints);
283static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
284    size_list, int list_count, bmi_size_t total_size, int* list_index,
285    bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
286    char* enc_hdr, bmi_size_t* env_amt_complete);
287
288#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
289static int tcp_enable_trusted(struct tcp_addr *tcp_addr_data);
290#endif
291#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
292static int tcp_allow_trusted(struct sockaddr_in *peer_sockaddr);
293#endif
294
295static void bmi_set_sock_buffers(int socket);
296
297/* exported method interface */
298const struct bmi_method_ops bmi_tcp_ops = {
299    .method_name = BMI_tcp_method_name,
300    .initialize = BMI_tcp_initialize,
301    .finalize = BMI_tcp_finalize,
302    .set_info = BMI_tcp_set_info,
303    .get_info = BMI_tcp_get_info,
304    .memalloc = BMI_tcp_memalloc,
305    .memfree  = BMI_tcp_memfree,
306    .unexpected_free = BMI_tcp_unexpected_free,
307    .post_send = BMI_tcp_post_send,
308    .post_sendunexpected = BMI_tcp_post_sendunexpected,
309    .post_recv = BMI_tcp_post_recv,
310    .test = BMI_tcp_test,
311    .testsome = BMI_tcp_testsome,
312    .testcontext = BMI_tcp_testcontext,
313    .testunexpected = BMI_tcp_testunexpected,
314    .method_addr_lookup = BMI_tcp_method_addr_lookup,
315    .post_send_list = BMI_tcp_post_send_list,
316    .post_recv_list = BMI_tcp_post_recv_list,
317    .post_sendunexpected_list = BMI_tcp_post_sendunexpected_list,
318    .open_context = BMI_tcp_open_context,
319    .close_context = BMI_tcp_close_context,
320    .cancel = BMI_tcp_cancel,
321    .rev_lookup_unexpected = BMI_tcp_addr_rev_lookup_unexpected,
322    .query_addr_range = BMI_tcp_query_addr_range,
323};
324
325/* module parameters */
326static struct
327{
328    int method_flags;
329    int method_id;
330    bmi_method_addr_p listen_addr;
331} tcp_method_params;
332
333#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
334static struct tcp_allowed_connection_s *gtcp_allowed_connection = NULL;
335#endif
336
337static int check_unexpected = 1;
338
339/* op_list_array indices */
340enum
341{
342    NUM_INDICES = 5,
343    IND_SEND = 0,
344    IND_RECV = 1,
345    IND_RECV_INFLIGHT = 2,
346    IND_RECV_EAGER_DONE_BUFFERING = 3,
347    IND_COMPLETE_RECV_UNEXP = 4,        /* MAKE THIS COMES LAST */
348};
349
350/* internal operation lists */
351static op_list_p op_list_array[6] = { NULL, NULL, NULL, NULL,
352    NULL, NULL
353};
354
355/* internal completion queues */
356static op_list_p completion_array[BMI_MAX_CONTEXTS] = { NULL };
357
358/* internal socket collection */
359static socket_collection_p tcp_socket_collection_p = NULL;
360
361/* tunable parameters */
362enum
363{
364    /* amount of pending connections we'll allow */
365    TCP_BACKLOG = 256,
366    /* amount of work to be done during a test.  This roughly
367     * translates into the number of sockets that we will perform
368     * nonblocking operations on during one function call.
369     */
370    TCP_WORK_METRIC = 128
371};
372
373/* TCP message modes */
374enum
375{
376    TCP_MODE_IMMED = 1,         /* not used for TCP/IP */
377    TCP_MODE_UNEXP = 2,
378    TCP_MODE_EAGER = 4,
379    TCP_MODE_REND = 8
380};
381
382/* Allowable sizes for each mode */
383enum
384{
385    TCP_MODE_EAGER_LIMIT = 16384,       /* 16K */
386    TCP_MODE_REND_LIMIT = 16777216      /* 16M */
387};
388
389/* toggles cancel mode; for bmi_tcp this will result in socket being closed
390 * in all cancellation cases
391 */
392static int forceful_cancel_mode = 0;
393
394/*
395  Socket buffer sizes, currently these default values will be used
396  for the clients... (TODO)
397 */
398static int tcp_buffer_size_receive = 0;
399static int tcp_buffer_size_send = 0;
400
401static PINT_event_type bmi_tcp_send_event_id;
402static PINT_event_type bmi_tcp_recv_event_id;
403
404static PINT_event_group bmi_tcp_event_group;
405static pid_t bmi_tcp_pid;
406
407/*************************************************************************
408 * Visible Interface
409 */
410
411/* BMI_tcp_initialize()
412 *
413 * Initializes the tcp method.  Must be called before any other tcp
414 * method functions.
415 *
416 * returns 0 on success, -errno on failure
417 */
418int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
419                       int method_id,
420                       int init_flags)
421{
422
423    int ret = -1;
424    int tmp_errno = bmi_tcp_errno_to_pvfs(-ENOSYS);
425    struct tcp_addr *tcp_addr_data = NULL;
426    int i = 0;
427
428    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Initializing TCP/IP module.\n");
429
430    /* check args */
431    if ((init_flags & BMI_INIT_SERVER) && !listen_addr)
432    {
433        gossip_lerr("Error: bad parameters given to TCP/IP module.\n");
434        return (bmi_tcp_errno_to_pvfs(-EINVAL));
435    }
436
437    gen_mutex_lock(&interface_mutex);
438
439    /* zero out our parameter structure and fill it in */
440    memset(&tcp_method_params, 0, sizeof(tcp_method_params));
441    tcp_method_params.method_id = method_id;
442    tcp_method_params.method_flags = init_flags;
443
444    if (init_flags & BMI_INIT_SERVER)
445    {
446        /* hang on to our local listening address if needed */
447        tcp_method_params.listen_addr = listen_addr;
448        /* and initialize server functions */
449        ret = tcp_server_init();
450        if (ret < 0)
451        {
452            tmp_errno = bmi_tcp_errno_to_pvfs(ret);
453            gossip_err("Error: tcp_server_init() failure.\n");
454            goto initialize_failure;
455        }
456    }
457
458    /* set up the operation lists */
459    for (i = 0; i < NUM_INDICES; i++)
460    {
461        op_list_array[i] = op_list_new();
462        if (!op_list_array[i])
463        {
464            tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
465            goto initialize_failure;
466        }
467    }
468
469    /* set up the socket collection */
470    if (tcp_method_params.method_flags & BMI_INIT_SERVER)
471    {
472        tcp_addr_data = tcp_method_params.listen_addr->method_data;
473        tcp_socket_collection_p = BMI_socket_collection_init(tcp_addr_data->socket);
474    }
475    else
476    {
477        tcp_socket_collection_p = BMI_socket_collection_init(-1);
478    }
479
480    if (!tcp_socket_collection_p)
481    {
482        tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
483        goto initialize_failure;
484    }
485
486    bmi_tcp_pid = getpid();
487    PINT_event_define_group("bmi_tcp", &bmi_tcp_event_group);
488
489    /* Define the send event:
490     *   START: (client_id, request_id, rank, handle, op_id, send_size)
491     *   STOP: (size_sent)
492     */
493    PINT_event_define_event(
494        &bmi_tcp_event_group,
495#ifdef __PVFS2_SERVER__
496        "bmi_server_send",
497#else
498        "bmi_client_send",
499#endif
500        "%d%d%d%llu%d%d",
501        "%d", &bmi_tcp_send_event_id);
502
503    /* Define the recv event:
504     *   START: (client_id, request_id, rank, handle, op_id, recv_size)
505     *   STOP: (size_received)
506     */
507    PINT_event_define_event(
508        &bmi_tcp_event_group,
509#ifdef __PVFS2_SERVER__
510        "bmi_server_recv",
511#else
512        "bmi_client_recv",
513#endif
514        "%d%d%d%llu%d%d",
515        "%d", &bmi_tcp_recv_event_id);
516
517    gen_mutex_unlock(&interface_mutex);
518    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
519                  "TCP/IP module successfully initialized.\n");
520    return (0);
521
522  initialize_failure:
523
524    /* cleanup data structures and bail out */
525    for (i = 0; i < NUM_INDICES; i++)
526    {
527        if (op_list_array[i])
528        {
529            op_list_cleanup(op_list_array[i]);
530        }
531    }
532    if (tcp_socket_collection_p)
533    {
534        BMI_socket_collection_finalize(tcp_socket_collection_p);
535    }
536    gen_mutex_unlock(&interface_mutex);
537    return (tmp_errno);
538}
539
540
541/* BMI_tcp_finalize()
542 *
543 * Shuts down the tcp method.
544 *
545 * returns 0 on success, -errno on failure
546 */
547int BMI_tcp_finalize(void)
548{
549    int i = 0;
550
551    gen_mutex_lock(&interface_mutex);
552
553    /* shut down our listen addr, if we have one */
554    if ((tcp_method_params.method_flags & BMI_INIT_SERVER)
555        && tcp_method_params.listen_addr)
556    {
557        dealloc_tcp_method_addr(tcp_method_params.listen_addr);
558    }
559
560    /* note that this forcefully shuts down operations */
561    for (i = 0; i < NUM_INDICES; i++)
562    {
563        if (op_list_array[i])
564        {
565            op_list_cleanup(op_list_array[i]);
566            op_list_array[i] = NULL;
567        }
568    }
569
570    /* get rid of socket collection */
571    if (tcp_socket_collection_p)
572    {
573        BMI_socket_collection_finalize(tcp_socket_collection_p);
574        tcp_socket_collection_p = NULL;
575    }
576
577    /* NOTE: we are trusting the calling BMI layer to deallocate
578     * all of the method addresses (this will close any open sockets)
579     */
580    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "TCP/IP module finalized.\n");
581    gen_mutex_unlock(&interface_mutex);
582    return (0);
583}
584
585
586/*
587 * BMI_tcp_method_addr_lookup()
588 *
589 * resolves the string representation of an address into a method
590 * address structure. 
591 *
592 * returns a pointer to method_addr on success, NULL on failure
593 */
594bmi_method_addr_p BMI_tcp_method_addr_lookup(const char *id_string)
595{
596    char *tcp_string = NULL;
597    char *delim = NULL;
598    char *hostname = NULL;
599    bmi_method_addr_p new_addr = NULL;
600    struct tcp_addr *tcp_addr_data = NULL;
601    int ret = -1;
602
603    tcp_string = string_key("tcp", id_string);
604    if (!tcp_string)
605    {
606        /* the string doesn't even have our info */
607        return (NULL);
608    }
609
610    /* start breaking up the method information */
611    /* for normal tcp, it is simply hostname:port */
612    if ((delim = index(tcp_string, ':')) == NULL)
613    {
614        gossip_lerr("Error: malformed tcp address.\n");
615        free(tcp_string);
616        return (NULL);
617    }
618
619    /* looks ok, so let's build the method addr structure */
620    new_addr = alloc_tcp_method_addr();
621    if (!new_addr)
622    {
623        free(tcp_string);
624        return (NULL);
625    }
626    tcp_addr_data = new_addr->method_data;
627
628    ret = sscanf((delim + 1), "%d", &(tcp_addr_data->port));
629    if (ret != 1)
630    {
631        gossip_lerr("Error: malformed tcp address.\n");
632        dealloc_tcp_method_addr(new_addr);
633        free(tcp_string);
634        return (NULL);
635    }
636
637    hostname = (char *) malloc((delim - tcp_string + 1));
638    if (!hostname)
639    {
640        dealloc_tcp_method_addr(new_addr);
641        free(tcp_string);
642        return (NULL);
643    }
644    strncpy(hostname, tcp_string, (delim - tcp_string));
645    hostname[delim - tcp_string] = '\0';
646
647    tcp_addr_data->hostname = hostname;
648
649    free(tcp_string);
650    return (new_addr);
651}
652
653
654/* BMI_tcp_memalloc()
655 *
656 * Allocates memory that can be used in native mode by tcp.
657 *
658 * returns 0 on success, -errno on failure
659 */
660void *BMI_tcp_memalloc(bmi_size_t size,
661                       enum bmi_op_type send_recv)
662{
663    /* we really don't care what flags the caller uses, TCP/IP has no
664     * preferences about how the memory should be configured.
665     */
666
667/*    return (calloc(1,(size_t) size)); */
668    return PINT_mem_aligned_alloc(size, 4096);
669}
670
671
672/* BMI_tcp_memfree()
673 *
674 * Frees memory that was allocated with BMI_tcp_memalloc()
675 *
676 * returns 0 on success, -errno on failure
677 */
678int BMI_tcp_memfree(void *buffer,
679                    bmi_size_t size,
680                    enum bmi_op_type send_recv)
681{
682    PINT_mem_aligned_free(buffer);
683    return (0);
684}
685
686/* BMI_tcp_unexpected_free()
687 *
688 * Frees memory that was returned from BMI_tcp_test_unexpected()
689 *
690 * returns 0 on success, -errno on failure
691 */
692int BMI_tcp_unexpected_free(void *buffer)
693{
694    if (buffer)
695    {
696        free(buffer);
697    }
698    return (0);
699}
700
701#ifdef USE_TRUSTED
702
703static struct tcp_allowed_connection_s *
704alloc_trusted_connection_info(int network_count)
705{
706    struct tcp_allowed_connection_s *tcp_allowed_connection_info = NULL;
707
708    tcp_allowed_connection_info = (struct tcp_allowed_connection_s *)
709            calloc(1, sizeof(struct tcp_allowed_connection_s));
710    if (tcp_allowed_connection_info)
711    {
712        tcp_allowed_connection_info->network =
713            (struct in_addr *) calloc(network_count, sizeof(struct in_addr));
714        if (tcp_allowed_connection_info->network == NULL)
715        {
716            free(tcp_allowed_connection_info);
717            tcp_allowed_connection_info = NULL;
718        }
719        else
720        {
721            tcp_allowed_connection_info->netmask =
722                (struct in_addr *) calloc(network_count, sizeof(struct in_addr));
723            if (tcp_allowed_connection_info->netmask == NULL)
724            {
725                free(tcp_allowed_connection_info->network);
726                free(tcp_allowed_connection_info);
727                tcp_allowed_connection_info = NULL;
728            }
729            else {
730                tcp_allowed_connection_info->network_count = network_count;
731            }
732        }
733    }
734    return tcp_allowed_connection_info;
735}
736
737static void
738dealloc_trusted_connection_info(void* ptcp_allowed_connection_info)
739{
740    struct tcp_allowed_connection_s *tcp_allowed_connection_info =
741        (struct tcp_allowed_connection_s *) ptcp_allowed_connection_info;
742    if (tcp_allowed_connection_info)
743    {
744        free(tcp_allowed_connection_info->network);
745        tcp_allowed_connection_info->network = NULL;
746        free(tcp_allowed_connection_info->netmask);
747        tcp_allowed_connection_info->netmask = NULL;
748        free(tcp_allowed_connection_info);
749    }
750    return;
751}
752
753#endif
754
755/*
756 * This function will convert a mask_bits value to an in_addr
757 * representation. i.e for example if
758 * mask_bits was 24 then it would be 255.255.255.0
759 * if mask_bits was 22 then it would be 255.255.252.0
760 * etc
761 */
762static void convert_mask(int mask_bits, struct in_addr *mask)
763{
764   uint32_t addr = -1;
765   addr = addr & ~~(-1 << (mask_bits ? (32 - mask_bits) : 32));
766   mask->s_addr = htonl(addr);
767   return;
768}
769
770/* BMI_tcp_set_info()
771 *
772 * Pass in optional parameters.
773 *
774 * returns 0 on success, -errno on failure
775 */
776int BMI_tcp_set_info(int option,
777                     void *inout_parameter)
778{
779    int ret = -1;
780    bmi_method_addr_p tmp_addr = NULL;
781
782    gen_mutex_lock(&interface_mutex);
783
784    switch (option)
785    {
786    case BMI_TCP_BUFFER_SEND_SIZE:
787       tcp_buffer_size_send = *((int *)inout_parameter);
788       ret = 0;
789#ifdef __PVFS2_SERVER__
790       /* Set the default socket buffer sizes for the server socket */
791       bmi_set_sock_buffers(
792           ((struct tcp_addr *)
793            tcp_method_params.listen_addr->method_data)->socket);
794#endif
795       break;
796    case BMI_TCP_BUFFER_RECEIVE_SIZE:
797       tcp_buffer_size_receive = *((int *)inout_parameter);
798       ret = 0;
799#ifdef __PVFS2_SERVER__
800       /* Set the default socket buffer sizes for the server socket */
801       bmi_set_sock_buffers(
802           ((struct tcp_addr *)
803            tcp_method_params.listen_addr->method_data)->socket);
804#endif
805       break;
806    case BMI_TCP_CLOSE_SOCKET:
807        /* this should no longer make it to the bmi_tcp method; see bmi.c */
808        ret = 0;
809        break;
810    case BMI_FORCEFUL_CANCEL_MODE:
811        forceful_cancel_mode = 1;
812        ret = 0;
813        break;
814    case BMI_DROP_ADDR:
815        if (inout_parameter == NULL)
816        {
817            ret = bmi_tcp_errno_to_pvfs(-EINVAL);
818        }
819        else
820        {
821            tmp_addr = (bmi_method_addr_p) inout_parameter;
822            /* take it out of the socket collection */
823            tcp_forget_addr(tmp_addr, 1, 0);
824            ret = 0;
825        }
826        break;
827#ifdef USE_TRUSTED
828    case BMI_TRUSTED_CONNECTION:
829    {
830        struct tcp_allowed_connection_s *tcp_allowed_connection = NULL;
831        if (inout_parameter == NULL)
832        {
833            ret = bmi_tcp_errno_to_pvfs(-EINVAL);
834            break;
835        }
836        else
837        {
838            int    bmi_networks_count = 0;
839            char **bmi_networks = NULL;
840            int   *bmi_netmasks = NULL;
841            struct server_configuration_s *svc_config = NULL;
842
843            svc_config = (struct server_configuration_s *) inout_parameter;
844            tcp_allowed_connection = alloc_trusted_connection_info(svc_config->allowed_networks_count);
845            if (tcp_allowed_connection == NULL)
846            {
847                ret = bmi_tcp_errno_to_pvfs(-ENOMEM);
848                break;
849            }
850#ifdef      __PVFS2_SERVER__
851            gtcp_allowed_connection = tcp_allowed_connection;
852#endif
853            /* Stash this in the server_configuration_s structure. freed later on */
854            svc_config->security = tcp_allowed_connection;
855            svc_config->security_dtor = &dealloc_trusted_connection_info;
856            ret = 0;
857            /* Fill up the list of allowed ports */
858            PINT_config_get_allowed_ports(svc_config,
859                    &tcp_allowed_connection->port_enforce,
860                    tcp_allowed_connection->ports);
861
862            /* if it was enabled, make sure that we know how to deal with it */
863            if (tcp_allowed_connection->port_enforce == 1)
864            {
865                /* illegal ports */
866                if (tcp_allowed_connection->ports[0] > 65535
867                        || tcp_allowed_connection->ports[1] > 65535
868                        || tcp_allowed_connection->ports[1] < tcp_allowed_connection->ports[0])
869                {
870                    gossip_lerr("Error: illegal trusted port values\n");
871                    ret = bmi_tcp_errno_to_pvfs(-EINVAL);
872                    /* don't enforce anything! */
873                    tcp_allowed_connection->port_enforce = 0;
874                }
875            }
876            ret = 0;
877            /* Retrieve the list of BMI network addresses and masks  */
878            PINT_config_get_allowed_networks(svc_config,
879                    &tcp_allowed_connection->network_enforce,
880                    &bmi_networks_count,
881                    &bmi_networks,
882                    &bmi_netmasks);
883
884            /* if it was enabled, make sure that we know how to deal with it */
885            if (tcp_allowed_connection->network_enforce == 1)
886            {
887                int i;
888
889                for (i = 0; i < bmi_networks_count; i++)
890                {
891                    char *tcp_string = NULL;
892                    /* Convert the network string into an in_addr_t structure */
893                    tcp_string = string_key("tcp", bmi_networks[i]);
894                    if (!tcp_string)
895                    {
896                        /* the string doesn't even have our info */
897                        gossip_lerr("Error: malformed tcp network address\n");
898                        ret = bmi_tcp_errno_to_pvfs(-EINVAL);
899                    }
900                    else {
901                        /* convert this into an in_addr_t */
902                        inet_aton(tcp_string, &tcp_allowed_connection->network[i]);
903                        free(tcp_string);
904                    }
905                    convert_mask(bmi_netmasks[i], &tcp_allowed_connection->netmask[i]);
906                }
907                /* don't enforce anything if there were any errors */
908                if (ret != 0)
909                {
910                    tcp_allowed_connection->network_enforce = 0;
911                }
912            }
913        }
914        break;
915    }
916#endif
917    case BMI_TCP_CHECK_UNEXPECTED:
918    {
919        check_unexpected = *(int *)inout_parameter;
920        ret = 0;
921        break;
922    }
923
924    default:
925        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
926                      "TCP hint %d not implemented.\n", option);
927        ret = 0;
928        break;
929    }
930
931    gen_mutex_unlock(&interface_mutex);
932    return (ret);
933}
934
935/* BMI_tcp_get_info()
936 *
937 * Query for optional parameters.
938 *
939 * returns 0 on success, -errno on failure
940 */
941int BMI_tcp_get_info(int option,
942                     void *inout_parameter)
943{
944    struct method_drop_addr_query* query;
945    struct tcp_addr* tcp_addr_data;
946    int ret = 0;
947
948    gen_mutex_lock(&interface_mutex);
949
950    switch (option)
951    {
952    case BMI_CHECK_MAXSIZE:
953        *((int *) inout_parameter) = TCP_MODE_REND_LIMIT;
954        ret = 0;
955        break;
956    case BMI_DROP_ADDR_QUERY:
957        query = (struct method_drop_addr_query*)inout_parameter;
958        tcp_addr_data=query->addr->method_data;
959        /* only suggest that we discard the address if we have experienced
960         * an error and there is no way to reconnect
961         */
962        if(tcp_addr_data->addr_error != 0 &&
963           tcp_addr_data->dont_reconnect == 1)
964        {
965            query->response = 1;
966        }
967        else
968        {
969            query->response = 0;
970        }
971        ret = 0;
972        break;
973    case BMI_GET_UNEXP_SIZE:
974        *((int *) inout_parameter) = TCP_MODE_EAGER_LIMIT;
975        ret = 0;
976        break;
977
978    default:
979        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
980                      "TCP hint %d not implemented.\n", option);
981        ret = -ENOSYS;
982        break;
983    }
984
985    gen_mutex_unlock(&interface_mutex);
986    return (ret < 0) ? bmi_tcp_errno_to_pvfs(ret) : ret;
987}
988
989
990/* BMI_tcp_post_send()
991 *
992 * Submits send operations.
993 *
994 * returns 0 on success that requires later poll, returns 1 on instant
995 * completion, -errno on failure
996 */
997int BMI_tcp_post_send(bmi_op_id_t * id,
998                      bmi_method_addr_p dest,
999                      const void *buffer,
1000                      bmi_size_t size,
1001                      enum bmi_buffer_type buffer_type,
1002                      bmi_msg_tag_t tag,
1003                      void *user_ptr,
1004                      bmi_context_id context_id,
1005                      PVFS_hint hints)
1006{
1007    struct tcp_msg_header my_header;
1008    int ret = -1;
1009
1010    /* clear the id field for safety */
1011    *id = 0;
1012
1013    /* fill in the TCP-specific message header */
1014    if (size > TCP_MODE_REND_LIMIT)
1015    {
1016        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1017    }
1018
1019    if (size <= TCP_MODE_EAGER_LIMIT)
1020    {
1021        my_header.mode = TCP_MODE_EAGER;
1022    }
1023    else
1024    {
1025        my_header.mode = TCP_MODE_REND;
1026    }
1027    my_header.tag = tag;
1028    my_header.size = size;
1029    my_header.magic_nr = BMI_MAGIC_NR;
1030
1031    gen_mutex_lock(&interface_mutex);
1032
1033    ret = tcp_post_send_generic(id, dest, &buffer,
1034                                &size, 1, buffer_type, my_header,
1035                                user_ptr, context_id, hints);
1036
1037    gen_mutex_unlock(&interface_mutex);
1038    return(ret);
1039}
1040
1041
1042/* BMI_tcp_post_sendunexpected()
1043 *
1044 * Submits unexpected send operations.
1045 *
1046 * returns 0 on success that requires later poll, returns 1 on instant
1047 * completion, -errno on failure
1048 */
1049int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
1050                                bmi_method_addr_p dest,
1051                                const void *buffer,
1052                                bmi_size_t size,
1053                                enum bmi_buffer_type buffer_type,
1054                                bmi_msg_tag_t tag,
1055                                void *user_ptr,
1056                                bmi_context_id context_id,
1057                                PVFS_hint hints)
1058{
1059    struct tcp_msg_header my_header;
1060    int ret = -1;
1061
1062    /* clear the id field for safety */
1063    *id = 0;
1064
1065    if (size > TCP_MODE_EAGER_LIMIT)
1066    {
1067        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1068    }
1069
1070    my_header.mode = TCP_MODE_UNEXP;
1071    my_header.tag = tag;
1072    my_header.size = size;
1073    my_header.magic_nr = BMI_MAGIC_NR;
1074
1075    gen_mutex_lock(&interface_mutex);
1076
1077    ret = tcp_post_send_generic(id, dest, &buffer,
1078                                &size, 1, buffer_type, my_header,
1079                                user_ptr, context_id, hints);
1080    gen_mutex_unlock(&interface_mutex);
1081    return(ret);
1082}
1083
1084
1085
1086/* BMI_tcp_post_recv()
1087 *
1088 * Submits recv operations.
1089 *
1090 * returns 0 on success that requires later poll, returns 1 on instant
1091 * completion, -errno on failure
1092 */
1093int BMI_tcp_post_recv(bmi_op_id_t * id,
1094                      bmi_method_addr_p src,
1095                      void *buffer,
1096                      bmi_size_t expected_size,
1097                      bmi_size_t * actual_size,
1098                      enum bmi_buffer_type buffer_type,
1099                      bmi_msg_tag_t tag,
1100                      void *user_ptr,
1101                      bmi_context_id context_id,
1102                      PVFS_hint hints)
1103{
1104    int ret = -1;
1105
1106    /* A few things could happen here:
1107     * a) rendez. recv with sender not ready yet
1108     * b) rendez. recv with sender waiting
1109     * c) eager recv, data not available yet
1110     * d) eager recv, some/all data already here
1111     * e) rendez. recv with sender in eager mode
1112     *
1113     * b or d could lead to completion without polling.
1114     * we don't look for unexpected messages here.
1115     */
1116
1117    if (expected_size > TCP_MODE_REND_LIMIT)
1118    {
1119        return (bmi_tcp_errno_to_pvfs(-EINVAL));
1120    }
1121    gen_mutex_lock(&interface_mutex);
1122
1123    ret = tcp_post_recv_generic(id, src, &buffer, &expected_size,
1124                                1, expected_size, actual_size,
1125                                buffer_type, tag,
1126                                user_ptr, context_id, hints);
1127
1128    gen_mutex_unlock(&interface_mutex);
1129    return (ret);
1130}
1131
1132
1133/* BMI_tcp_test()
1134 *
1135 * Checks to see if a particular message has completed.
1136 *
1137 * returns 0 on success, -errno on failure
1138 */
1139int BMI_tcp_test(bmi_op_id_t id,
1140                 int *outcount,
1141                 bmi_error_code_t * error_code,
1142                 bmi_size_t * actual_size,
1143                 void **user_ptr,
1144                 int max_idle_time,
1145                 bmi_context_id context_id)
1146{
1147    int ret = -1;
1148    method_op_p query_op = (method_op_p)id_gen_fast_lookup(id);
1149
1150    assert(query_op != NULL);
1151
1152    gen_mutex_lock(&interface_mutex);
1153
1154    /* do some ``real work'' here */
1155    ret = tcp_do_work(max_idle_time);
1156    if (ret < 0)
1157    {
1158        gen_mutex_unlock(&interface_mutex);
1159        return (ret);
1160    }
1161
1162    if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1163        BMI_TCP_COMPLETE)
1164    {
1165        assert(query_op->context_id == context_id);
1166        op_list_remove(query_op);
1167        if (user_ptr != NULL)
1168        {
1169            (*user_ptr) = query_op->user_ptr;
1170        }
1171        (*error_code) = query_op->error_code;
1172        (*actual_size) = query_op->actual_size;
1173        PINT_EVENT_END(
1174            (query_op->send_recv == BMI_SEND ?
1175             bmi_tcp_send_event_id : bmi_tcp_recv_event_id), bmi_tcp_pid, NULL,
1176             query_op->event_id, id, *actual_size);
1177
1178        dealloc_tcp_method_op(query_op);
1179        (*outcount)++;
1180    }
1181
1182    gen_mutex_unlock(&interface_mutex);
1183    return (0);
1184}
1185
1186/* BMI_tcp_testsome()
1187 *
1188 * Checks to see if any messages from the specified list have completed.
1189 *
1190 * returns 0 on success, -errno on failure
1191 */
1192int BMI_tcp_testsome(int incount,
1193                     bmi_op_id_t * id_array,
1194                     int *outcount,
1195                     int *index_array,
1196                     bmi_error_code_t * error_code_array,
1197                     bmi_size_t * actual_size_array,
1198                     void **user_ptr_array,
1199                     int max_idle_time,
1200                     bmi_context_id context_id)
1201{
1202    int ret = -1;
1203    method_op_p query_op = NULL;
1204    int i;
1205
1206    gen_mutex_lock(&interface_mutex);
1207
1208    /* do some ``real work'' here */
1209    ret = tcp_do_work(max_idle_time);
1210    if (ret < 0)
1211    {
1212        gen_mutex_unlock(&interface_mutex);
1213        return (ret);
1214    }
1215
1216    for(i=0; i<incount; i++)
1217    {
1218        if(id_array[i])
1219        {
1220            /* NOTE: this depends on the user passing in valid id's;
1221             * otherwise we segfault. 
1222             */
1223            query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
1224            if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1225               BMI_TCP_COMPLETE)
1226            {
1227                assert(query_op->context_id == context_id);
1228                /* this one's done; pop it out */
1229                op_list_remove(query_op);
1230                error_code_array[*outcount] = query_op->error_code;
1231                actual_size_array[*outcount] = query_op->actual_size;
1232                index_array[*outcount] = i;
1233                if (user_ptr_array != NULL)
1234                {
1235                    user_ptr_array[*outcount] = query_op->user_ptr;
1236                }
1237                PINT_EVENT_END(
1238                    (query_op->send_recv == BMI_SEND ?
1239                     bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
1240                    bmi_tcp_pid, NULL,
1241                    query_op->event_id, actual_size_array[*outcount]);
1242                dealloc_tcp_method_op(query_op);
1243                (*outcount)++;
1244            }
1245        }
1246    }
1247
1248    gen_mutex_unlock(&interface_mutex);
1249    return(0);
1250}
1251
1252
1253/* BMI_tcp_testunexpected()
1254 *
1255 * Checks to see if any unexpected messages have completed.
1256 *
1257 * returns 0 on success, -errno on failure
1258 */
1259int BMI_tcp_testunexpected(int incount,
1260                           int *outcount,
1261                           struct bmi_method_unexpected_info *info,
1262                           int max_idle_time)
1263{
1264    int ret = -1;
1265    method_op_p query_op = NULL;
1266
1267    gen_mutex_lock(&interface_mutex);
1268
1269    if(op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
1270    {
1271        /* do some ``real work'' here */
1272        ret = tcp_do_work(max_idle_time);
1273        if (ret < 0)
1274        {
1275            gen_mutex_unlock(&interface_mutex);
1276            return (ret);
1277        }
1278    }
1279
1280    *outcount = 0;
1281
1282    /* go through the completed/unexpected list as long as we are finding
1283     * stuff and we have room in the info array for it
1284     */
1285    while ((*outcount < incount) &&
1286           (query_op =
1287            op_list_shownext(op_list_array[IND_COMPLETE_RECV_UNEXP])))
1288    {
1289        info[*outcount].error_code = query_op->error_code;
1290        info[*outcount].addr = query_op->addr;
1291        info[*outcount].buffer = query_op->buffer;
1292        info[*outcount].size = query_op->actual_size;
1293        info[*outcount].tag = query_op->msg_tag;
1294        op_list_remove(query_op);
1295        dealloc_tcp_method_op(query_op);
1296        (*outcount)++;
1297    }
1298    gen_mutex_unlock(&interface_mutex);
1299    return (0);
1300}
1301
1302
1303/* BMI_tcp_testcontext()
1304 *
1305 * Checks to see if any messages from the specified context have completed.
1306 *
1307 * returns 0 on success, -errno on failure
1308 */
1309int BMI_tcp_testcontext(int incount,
1310                     bmi_op_id_t* out_id_array,
1311                     int *outcount,
1312                     bmi_error_code_t * error_code_array,
1313                     bmi_size_t * actual_size_array,
1314                     void **user_ptr_array,
1315                     int max_idle_time,
1316                     bmi_context_id context_id)
1317{
1318    int ret = -1;
1319    method_op_p query_op = NULL;
1320
1321    *outcount = 0;
1322
1323    gen_mutex_lock(&interface_mutex);
1324
1325    if(op_list_empty(completion_array[context_id]))
1326    {
1327        /* if there are unexpected ops ready to go, then short out so
1328         * that the next testunexpected call can pick it up without
1329         * delay
1330         */
1331        if(check_unexpected &&
1332           !op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
1333        {
1334            gen_mutex_unlock(&interface_mutex);
1335            return(0);
1336        }
1337
1338        /* do some ``real work'' here */
1339        ret = tcp_do_work(max_idle_time);
1340        if (ret < 0)
1341        {
1342            gen_mutex_unlock(&interface_mutex);
1343            return (ret);
1344        }
1345    }
1346
1347    /* pop as many items off of the completion queue as we can */
1348    while((*outcount < incount) &&
1349          (query_op =
1350           op_list_shownext(completion_array[context_id])))
1351    {
1352        assert(query_op);
1353        assert(query_op->context_id == context_id);
1354
1355        /* this one's done; pop it out */
1356        op_list_remove(query_op);
1357        error_code_array[*outcount] = query_op->error_code;
1358        actual_size_array[*outcount] = query_op->actual_size;
1359        out_id_array[*outcount] = query_op->op_id;
1360        if (user_ptr_array != NULL)
1361        {
1362            user_ptr_array[*outcount] = query_op->user_ptr;
1363        }
1364
1365        PINT_EVENT_END((query_op->send_recv == BMI_SEND ?
1366                        bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
1367                       bmi_tcp_pid, NULL, query_op->event_id,
1368                       query_op->actual_size);
1369
1370        dealloc_tcp_method_op(query_op);
1371        query_op = NULL;
1372        (*outcount)++;
1373    }
1374
1375    gen_mutex_unlock(&interface_mutex);
1376    return(0);
1377}
1378
1379
1380
1381/* BMI_tcp_post_send_list()
1382 *
1383 * same as the BMI_tcp_post_send() function, except that it sends
1384 * from an array of possibly non contiguous buffers
1385 *
1386 * returns 0 on success, 1 on immediate successful completion,
1387 * -errno on failure
1388 */
1389int BMI_tcp_post_send_list(bmi_op_id_t * id,
1390                           bmi_method_addr_p dest,
1391                           const void *const *buffer_list,
1392                           const bmi_size_t *size_list,
1393                           int list_count,
1394                           bmi_size_t total_size,
1395                           enum bmi_buffer_type buffer_type,
1396                           bmi_msg_tag_t tag,
1397                           void *user_ptr,
1398                           bmi_context_id context_id,
1399                           PVFS_hint hints)
1400{
1401    struct tcp_msg_header my_header;
1402    int ret = -1;
1403
1404    /* clear the id field for safety */
1405    *id = 0;
1406
1407    /* fill in the TCP-specific message header */
1408    if (total_size > TCP_MODE_REND_LIMIT)
1409    {
1410        gossip_lerr("Error: BMI message too large!\n");
1411        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1412    }
1413
1414    if (total_size <= TCP_MODE_EAGER_LIMIT)
1415    {
1416        my_header.mode = TCP_MODE_EAGER;
1417    }
1418    else
1419    {
1420        my_header.mode = TCP_MODE_REND;
1421    }
1422    my_header.tag = tag;
1423    my_header.size = total_size;
1424    my_header.magic_nr = BMI_MAGIC_NR;
1425
1426    gen_mutex_lock(&interface_mutex);
1427
1428    ret = tcp_post_send_generic(id, dest, buffer_list,
1429                                size_list, list_count, buffer_type,
1430                                my_header, user_ptr, context_id, hints);
1431    gen_mutex_unlock(&interface_mutex);
1432    return(ret);
1433}
1434
1435/* BMI_tcp_post_recv_list()
1436 *
1437 * same as the BMI_tcp_post_recv() function, except that it recvs
1438 * into an array of possibly non contiguous buffers
1439 *
1440 * returns 0 on success, 1 on immediate successful completion,
1441 * -errno on failure
1442 */
1443int BMI_tcp_post_recv_list(bmi_op_id_t * id,
1444                           bmi_method_addr_p src,
1445                           void *const *buffer_list,
1446                           const bmi_size_t *size_list,
1447                           int list_count,
1448                           bmi_size_t total_expected_size,
1449                           bmi_size_t * total_actual_size,
1450                           enum bmi_buffer_type buffer_type,
1451                           bmi_msg_tag_t tag,
1452                           void *user_ptr,
1453                           bmi_context_id context_id,
1454                           PVFS_hint hints)
1455{
1456    int ret = -1;
1457
1458    if (total_expected_size > TCP_MODE_REND_LIMIT)
1459    {
1460        return (bmi_tcp_errno_to_pvfs(-EINVAL));
1461    }
1462
1463    gen_mutex_lock(&interface_mutex);
1464
1465    ret = tcp_post_recv_generic(id, src, buffer_list, size_list,
1466                                list_count, total_expected_size,
1467                                total_actual_size, buffer_type, tag, user_ptr,
1468                                context_id, hints);
1469
1470    gen_mutex_unlock(&interface_mutex);
1471    return (ret);
1472}
1473
1474
1475/* BMI_tcp_post_sendunexpected_list()
1476 *
1477 * same as the BMI_tcp_post_sendunexpected() function, except that
1478 * it sends from an array of possibly non contiguous buffers
1479 *
1480 * returns 0 on success, 1 on immediate successful completion,
1481 * -errno on failure
1482 */
1483int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
1484                                     bmi_method_addr_p dest,
1485                                     const void *const *buffer_list,
1486                                     const bmi_size_t *size_list,
1487                                     int list_count,
1488                                     bmi_size_t total_size,
1489                                     enum bmi_buffer_type buffer_type,
1490                                     bmi_msg_tag_t tag,
1491                                     void *user_ptr,
1492                                     bmi_context_id context_id,
1493                                     PVFS_hint hints)
1494{
1495    struct tcp_msg_header my_header;
1496    int ret = -1;
1497
1498    /* clear the id field for safety */
1499    *id = 0;
1500
1501    if (total_size > TCP_MODE_EAGER_LIMIT)
1502    {
1503        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1504    }
1505
1506    my_header.mode = TCP_MODE_UNEXP;
1507    my_header.tag = tag;
1508    my_header.size = total_size;
1509    my_header.magic_nr = BMI_MAGIC_NR;
1510
1511    gen_mutex_lock(&interface_mutex);
1512
1513    ret = tcp_post_send_generic(id, dest, buffer_list,
1514                                size_list, list_count, buffer_type,
1515                                my_header, user_ptr, context_id, hints);
1516
1517    gen_mutex_unlock(&interface_mutex);
1518    return(ret);
1519}
1520
1521
1522/* BMI_tcp_open_context()
1523 *
1524 * opens a new context with the specified context id
1525 *
1526 * returns 0 on success, -errno on failure
1527 */
1528int BMI_tcp_open_context(bmi_context_id context_id)
1529{
1530
1531    gen_mutex_lock(&interface_mutex);
1532
1533    /* start a new queue for tracking completions in this context */
1534    completion_array[context_id] = op_list_new();
1535    if (!completion_array[context_id])
1536    {
1537        gen_mutex_unlock(&interface_mutex);
1538        return(bmi_tcp_errno_to_pvfs(-ENOMEM));
1539    }
1540
1541    gen_mutex_unlock(&interface_mutex);
1542    return(0);
1543}
1544
1545
1546/* BMI_tcp_close_context()
1547 *
1548 * shuts down a context, previously opened with BMI_tcp_open_context()
1549 *
1550 * no return value
1551 */
1552void BMI_tcp_close_context(bmi_context_id context_id)
1553{
1554   
1555    gen_mutex_lock(&interface_mutex);
1556
1557    /* tear down completion queue for this context */
1558    op_list_cleanup(completion_array[context_id]);
1559
1560    gen_mutex_unlock(&interface_mutex);
1561    return;
1562}
1563
1564
1565/* BMI_tcp_cancel()
1566 *
1567 * attempt to cancel a pending bmi tcp operation
1568 *
1569 * returns 0 on success, -errno on failure
1570 */
1571int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id)
1572{
1573    method_op_p query_op = NULL;
1574   
1575    gen_mutex_lock(&interface_mutex);
1576
1577    query_op = (method_op_p)id_gen_fast_lookup(id);
1578    if(!query_op)
1579    {
1580        /* if we can't find the operattion, then assume that it has already
1581         * completed naturally
1582         */
1583        gen_mutex_unlock(&interface_mutex);
1584        return(0);
1585    }
1586
1587    /* easy case: is the operation already completed? */
1588    if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1589        BMI_TCP_COMPLETE)
1590    {
1591        /* only close socket in forceful cancel mode */
1592        if(forceful_cancel_mode)
1593            tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1594        /* we are done! status will be collected during test */
1595        gen_mutex_unlock(&interface_mutex);
1596        return(0);
1597    }
1598
1599    /* has the operation started moving data yet? */
1600    if(query_op->env_amt_complete)
1601    {
1602        /* be pessimistic and kill the socket, even if not in forceful
1603         * cancel mode */
1604        /* NOTE: this may place other operations beside this one into
1605         * EINTR error state
1606         */
1607        tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1608        gen_mutex_unlock(&interface_mutex);
1609        return(0);
1610    }
1611
1612    /* if we fall to this point, op has been posted, but no data has moved
1613     * for it yet as far as we know
1614     */
1615
1616    /* mark op as canceled, move to completion queue */
1617    query_op->error_code = -BMI_ECANCEL;
1618    if(query_op->send_recv == BMI_SEND)
1619    {
1620        BMI_socket_collection_remove_write_bit(tcp_socket_collection_p,
1621                                           query_op->addr);
1622    }
1623    op_list_remove(query_op);
1624    ((struct tcp_op*)(query_op->method_data))->tcp_op_state =
1625        BMI_TCP_COMPLETE;
1626    /* only close socket in forceful cancel mode */
1627    if(forceful_cancel_mode)
1628        tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1629    op_list_add(completion_array[query_op->context_id], query_op);
1630    gen_mutex_unlock(&interface_mutex);
1631    return(0);
1632}
1633
1634/*
1635 * For now, we only support wildcard strings that are IP addresses
1636 * and not *hostnames*!
1637 */
1638static int check_valid_wildcard(const char *wildcard_string, unsigned long *octets)
1639{
1640    int i, len = strlen(wildcard_string), last_dot = -1, octet_count = 0;
1641    char str[16];
1642    for (i = 0; i < len; i++)
1643    {
1644        char c = wildcard_string[i];
1645        memset(str, 0, 16);
1646        if ((c < '0' || c > '9') && c != '*' && c != '.')
1647            return -EINVAL;
1648        if (c == '*') {
1649            if (octet_count >= 4)
1650                return -EINVAL;
1651            octets[octet_count++] = 256;
1652        }
1653        else if (c == '.')
1654        {
1655            char *endptr = NULL;
1656            if (octet_count >= 4)
1657                return -EINVAL;
1658            strncpy(str, &wildcard_string[last_dot + 1], (i - last_dot - 1));
1659            octets[octet_count++] = strtol(str, &endptr, 10);
1660            if (*endptr != '\0' || octets[octet_count-1] >= 256)
1661                return -EINVAL;
1662            last_dot = i;
1663        }
1664    }
1665    for (i = octet_count; i < 4; i++)
1666    {
1667         octets[i] = 256;
1668    }
1669    return 0;
1670}
1671
1672/*
1673 * return 1 if the addr specified is part of the wildcard specification of octet
1674 * return 0 otherwise.
1675 */
1676static int check_octets(struct in_addr addr, unsigned long *octets)
1677{
1678#define B1_MASK  0xff000000
1679#define B1_SHIFT 24
1680#define B2_MASK  0x00ff0000
1681#define B2_SHIFT 16
1682#define B3_MASK  0x0000ff00
1683#define B3_SHIFT 8
1684#define B4_MASK  0x000000ff
1685    uint32_t host_addr = ntohl(addr.s_addr);
1686    /* * stands for all clients */
1687    if (octets[0] == 256)
1688    {
1689        return 1;
1690    }
1691    if (((host_addr & B1_MASK) >> B1_SHIFT) != octets[0])
1692    {
1693        return 0;
1694    }
1695    if (octets[1] == 256)
1696    {
1697        return 1;
1698    }
1699    if (((host_addr & B2_MASK) >> B2_SHIFT) != octets[1])
1700    {
1701        return 0;
1702    }
1703    if (octets[2] == 256)
1704    {
1705        return 1;
1706    }
1707    if (((host_addr & B3_MASK) >> B3_SHIFT) != octets[2])
1708    {
1709        return 0;
1710    }
1711    if (octets[3] == 256)
1712    {
1713        return 1;
1714    }
1715    if ((host_addr & B4_MASK) != octets[3])
1716    {
1717        return 0;
1718    }
1719    return 1;
1720#undef B1_MASK
1721#undef B1_SHIFT
1722#undef B2_MASK
1723#undef B2_SHIFT
1724#undef B3_MASK
1725#undef B3_SHIFT
1726#undef B4_MASK
1727}
1728/* BMI_tcp_query_addr_range()
1729 * Check if a given address is within the network specified by the wildcard string!
1730 * or if it is part of the subnet mask specified
1731 */
1732int BMI_tcp_query_addr_range(bmi_method_addr_p map, const char *wildcard_string, int netmask)
1733{
1734    struct tcp_addr *tcp_addr_data = map->method_data;
1735    struct sockaddr_in map_addr;
1736    socklen_t map_addr_len = sizeof(map_addr);
1737    const char *tcp_wildcard = wildcard_string + 6 /* strlen("tcp://") */;
1738    int ret = -1;
1739
1740    memset(&map_addr, 0, sizeof(map_addr));
1741    if(getpeername(tcp_addr_data->socket, (struct sockaddr *) &map_addr, &map_addr_len) < 0)
1742    {
1743        ret =  bmi_tcp_errno_to_pvfs(-EINVAL);
1744        gossip_err("Error: failed to retrieve peer name for client.\n");
1745        return(ret);
1746    }
1747    /* Wildcard specification */
1748    if (netmask == -1)
1749    {
1750        unsigned long octets[4];
1751        if (check_valid_wildcard(tcp_wildcard, octets) < 0)
1752        {
1753            gossip_lerr("Invalid wildcard specification: %s\n", tcp_wildcard);
1754            return -EINVAL;
1755        }
1756        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Map Address is : %s, Wildcard Octets: %lu.%lu.%lu.%lu\n", inet_ntoa(map_addr.sin_addr),
1757                octets[0], octets[1], octets[2], octets[3]);
1758        if (check_octets(map_addr.sin_addr, octets) == 1)
1759        {
1760            return 1;
1761        }
1762    }
1763    /* Netmask specification */
1764    else {
1765        struct sockaddr_in mask_addr, network_addr;
1766        memset(&mask_addr, 0, sizeof(mask_addr));
1767        memset(&network_addr, 0, sizeof(network_addr));
1768        /* Convert the netmask address */
1769        convert_mask(netmask, &mask_addr.sin_addr);
1770        /* Invalid network address */
1771        if (inet_aton(tcp_wildcard, &network_addr.sin_addr) == 0)
1772        {
1773            gossip_err("Invalid network specification: %s\n", tcp_wildcard);
1774            return -EINVAL;
1775        }
1776        /* Matches the subnet mask! */
1777        if ((map_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr)
1778                == (network_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr))
1779        {
1780            return 1;
1781        }
1782    }
1783    return 0;
1784}
1785
1786/* BMI_tcp_addr_rev_lookup_unexpected()
1787 *
1788 * looks up an address that was initialized unexpectedly and returns a string
1789 * hostname
1790 *
1791 * returns string on success, "UNKNOWN" on failure
1792 */
1793const char* BMI_tcp_addr_rev_lookup_unexpected(bmi_method_addr_p map)
1794{
1795    struct tcp_addr *tcp_addr_data = map->method_data;
1796    int debug_on;
1797    uint64_t mask;
1798    socklen_t peerlen;
1799    struct sockaddr_in peer;
1800    int ret;
1801    struct hostent *peerent;
1802    char* tmp_peer;
1803
1804    /* return default response if we don't have support for the right socket
1805     * calls
1806     */
1807#if !defined(HAVE_GETHOSTBYADDR)
1808    return(tcp_addr_data->peer);
1809#else
1810
1811    /* Only resolve hostnames if a gossip mask is set to request it.
1812     * Otherwise we leave it at ip address
1813     */
1814    gossip_get_debug_mask(&debug_on, &mask);
1815
1816    if(!debug_on || (!(mask & GOSSIP_ACCESS_HOSTNAMES)))
1817    {
1818        return(tcp_addr_data->peer);
1819    }
1820
1821    peerlen = sizeof(struct sockaddr_in);
1822
1823    if(tcp_addr_data->peer_type == BMI_TCP_PEER_HOSTNAME)
1824    {
1825        /* full hostname already cached; return now */
1826        return(tcp_addr_data->peer);
1827    }
1828
1829    /* if we hit this point, we need to resolve hostname */
1830    ret = getpeername(tcp_addr_data->socket, (struct sockaddr*)&(peer), &peerlen);
1831    if(ret < 0)
1832    {
1833        /* default to use IP address */
1834        return(tcp_addr_data->peer);
1835    }
1836
1837    peerent = gethostbyaddr((void*)&peer.sin_addr.s_addr,
1838        sizeof(struct in_addr), AF_INET);
1839    if(peerent == NULL)
1840    {
1841        /* default to use IP address */
1842        return(tcp_addr_data->peer);
1843    }
1844 
1845    tmp_peer = (char*)malloc(strlen(peerent->h_name) + 1);
1846    if(!tmp_peer)
1847    {
1848        /* default to use IP address */
1849        return(tcp_addr_data->peer);
1850    }
1851    strcpy(tmp_peer, peerent->h_name);
1852    if(tcp_addr_data->peer)
1853    {
1854        free(tcp_addr_data->peer);
1855    }
1856    tcp_addr_data->peer = tmp_peer;
1857    tcp_addr_data->peer_type = BMI_TCP_PEER_HOSTNAME;
1858    return(tcp_addr_data->peer);
1859
1860#endif
1861
1862}
1863
1864/* tcp_forget_addr()
1865 *
1866 * completely removes a tcp method address from use, and aborts any
1867 * operations that use the address.  If the
1868 * dealloc_flag is set, the memory used by the address will be
1869 * deallocated as well.
1870 *
1871 * no return value
1872 */
1873void tcp_forget_addr(bmi_method_addr_p map,
1874                     int dealloc_flag,
1875                     int error_code)
1876{
1877    struct tcp_addr* tcp_addr_data = map->method_data;
1878    BMI_addr_t bmi_addr = tcp_addr_data->bmi_addr;
1879    int tmp_outcount;
1880    bmi_method_addr_p tmp_addr;
1881    int tmp_status;
1882
1883    if (tcp_socket_collection_p)
1884    {
1885        BMI_socket_collection_remove(tcp_socket_collection_p, map);
1886        /* perform a test to force the socket collection to act on the remove
1887         * request before continuing
1888         */
1889        if(!sc_test_busy)
1890        {
1891            BMI_socket_collection_testglobal(tcp_socket_collection_p,
1892                0, &tmp_outcount, &tmp_addr, &tmp_status, 0);
1893        }
1894    }
1895
1896    tcp_shutdown_addr(map);
1897    tcp_cleanse_addr(map, error_code);
1898    tcp_addr_data->addr_error = error_code;
1899    if (dealloc_flag)
1900    {
1901        dealloc_tcp_method_addr(map);
1902    }
1903    else
1904    {
1905        /* this will cause the bmi control layer to check to see if
1906         * this address can be completely forgotten
1907         */
1908        bmi_method_addr_forget_callback(bmi_addr);
1909    }
1910    return;
1911};
1912
1913/******************************************************************
1914 * Internal support functions
1915 */
1916
1917
1918/*
1919 * dealloc_tcp_method_addr()
1920 *
1921 * destroys method address structures generated by the TCP/IP module.
1922 *
1923 * no return value
1924 */
1925static void dealloc_tcp_method_addr(bmi_method_addr_p map)
1926{
1927
1928    struct tcp_addr *tcp_addr_data = NULL;
1929
1930    tcp_addr_data = map->method_data;
1931    /* close the socket, as long as it is not the one we are listening on
1932     * as a server.
1933     */
1934    if (!tcp_addr_data->server_port)
1935    {
1936        if (tcp_addr_data->socket > -1)
1937        {
1938            close(tcp_addr_data->socket);
1939        }
1940    }
1941
1942    if (tcp_addr_data->hostname)
1943        free(tcp_addr_data->hostname);
1944    if (tcp_addr_data->peer)
1945        free(tcp_addr_data->peer);
1946
1947    bmi_dealloc_method_addr(map);
1948
1949    return;
1950}
1951
1952
1953/*
1954 * alloc_tcp_method_addr()
1955 *
1956 * creates a new method address with defaults filled in for TCP/IP.
1957 *
1958 * returns pointer to struct on success, NULL on failure
1959 */
1960bmi_method_addr_p alloc_tcp_method_addr(void)
1961{
1962
1963    struct bmi_method_addr *my_method_addr = NULL;
1964    struct tcp_addr *tcp_addr_data = NULL;
1965
1966    my_method_addr =
1967        bmi_alloc_method_addr(tcp_method_params.method_id, sizeof(struct tcp_addr));
1968    if (!my_method_addr)
1969    {
1970        return (NULL);
1971    }
1972
1973    /* note that we trust the alloc_method_addr() function to have zeroed
1974     * out the structures for us already
1975     */
1976
1977    tcp_addr_data = my_method_addr->method_data;
1978    tcp_addr_data->socket = -1;
1979    tcp_addr_data->port = -1;
1980    tcp_addr_data->map = my_method_addr;
1981    tcp_addr_data->sc_index = -1;
1982
1983    return (my_method_addr);
1984}
1985
1986
1987/*
1988 * tcp_server_init()
1989 *
1990 * this function is used to prepare a node to recieve incoming
1991 * connections if it is initialized in a server configuration.   
1992 *
1993 * returns 0 on succes, -errno on failure
1994 */
1995static int tcp_server_init(void)
1996{
1997
1998    int oldfl = 0;              /* old socket flags */
1999    struct tcp_addr *tcp_addr_data = NULL;
2000    int tmp_errno = bmi_tcp_errno_to_pvfs(-EINVAL);
2001    int ret = 0;
2002
2003    /* create a socket */
2004    tcp_addr_data = tcp_method_params.listen_addr->method_data;
2005    if ((tcp_addr_data->socket = BMI_sockio_new_sock()) < 0)
2006    {
2007        tmp_errno = errno;
2008        gossip_err("Error: BMI_sockio_new_sock: %s\n", strerror(tmp_errno));
2009        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2010    }
2011
2012    /* set it to non-blocking operation */
2013    oldfl = fcntl(tcp_addr_data->socket, F_GETFL, 0);
2014    if (!(oldfl & O_NONBLOCK))
2015    {
2016        fcntl(tcp_addr_data->socket, F_SETFL, oldfl | O_NONBLOCK);
2017    }
2018
2019    /* setup for a fast restart to avoid bind addr in use errors */
2020    BMI_sockio_set_sockopt(tcp_addr_data->socket, SO_REUSEADDR, 1);
2021
2022    /* bind it to the appropriate port */
2023    if(tcp_method_params.method_flags & BMI_TCP_BIND_SPECIFIC)
2024    {
2025        ret = BMI_sockio_bind_sock_specific(tcp_addr_data->socket,
2026            tcp_addr_data->hostname,
2027            tcp_addr_data->port);
2028        /* NOTE: this particular function converts errno in advance */
2029        if(ret < 0)
2030        {
2031            PVFS_perror_gossip("BMI_sockio_bind_sock_specific", ret);
2032            return(ret);
2033        }
2034    }
2035    else
2036    {
2037        ret = BMI_sockio_bind_sock(tcp_addr_data->socket,
2038            tcp_addr_data->port);
2039    }
2040   
2041    if (ret < 0)
2042    {
2043        tmp_errno = errno;
2044        gossip_err("Error: BMI_sockio_bind_sock: %s\n", strerror(tmp_errno));
2045        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2046    }
2047
2048    /* go ahead and listen to the socket */
2049    if (listen(tcp_addr_data->socket, TCP_BACKLOG) != 0)
2050    {
2051        tmp_errno = errno;
2052        gossip_err("Error: listen: %s\n", strerror(tmp_errno));
2053        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2054    }
2055
2056    return (0);
2057}
2058
2059
2060/* find_recv_inflight()
2061 *
2062 * checks to see if there is a recv operation in flight (when in flight
2063 * means that some of the data or envelope has been read) for a
2064 * particular address.
2065 *
2066 * returns pointer to operation on success, NULL if nothing found.
2067 */
2068static method_op_p find_recv_inflight(bmi_method_addr_p map)
2069{
2070    struct op_list_search_key key;
2071    method_op_p query_op = NULL;
2072
2073    memset(&key, 0, sizeof(struct op_list_search_key));
2074    key.method_addr = map;
2075    key.method_addr_yes = 1;
2076
2077    query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
2078
2079    return (query_op);
2080}
2081
2082
2083/* tcp_sock_init()
2084 *
2085 * this is an internal function which is used to build up a TCP/IP
2086 * connection in the situation of a client side operation.
2087 * addressing information to determine which fields need to be set.
2088 * If the connection is already established then it does no work.
2089 *
2090 * NOTE: this is safe to call repeatedly.  However, always check the
2091 * value of the not_connected field in the tcp address before using the
2092 * address.
2093 *
2094 * returns 0 on success, -errno on failure
2095 */
2096static int tcp_sock_init(bmi_method_addr_p my_method_addr)
2097{
2098
2099    int oldfl = 0;              /* socket flags */
2100    int ret = -1;
2101    struct pollfd poll_conn;
2102    struct tcp_addr *tcp_addr_data = my_method_addr->method_data;
2103    int tmp_errno = 0;
2104
2105    /* check for obvious problems */
2106    assert(my_method_addr);
2107    assert(my_method_addr->method_type == tcp_method_params.method_id);
2108    assert(tcp_addr_data->server_port == 0);
2109
2110    /* fail immediately if the address is in failure mode and we have no way
2111     * to reconnect
2112     */
2113    if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
2114    {
2115        gossip_debug(GOSSIP_BMI_DEBUG_TCP,
2116        "Warning: BMI communication attempted on an address in failure mode.\n");
2117        return(tcp_addr_data->addr_error);
2118    }
2119
2120    if(tcp_addr_data->addr_error)
2121    {
2122        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "%s: attempting reconnect.\n",
2123          __func__);
2124        tcp_addr_data->addr_error = 0;
2125        assert(tcp_addr_data->socket < 0);
2126        tcp_addr_data->not_connected = 1;
2127    }
2128
2129    /* is there already a socket? */
2130    if (tcp_addr_data->socket > -1)
2131    {
2132        /* check to see if we still need to work on the connect.. */
2133        if (tcp_addr_data->not_connected)
2134        {
2135            /* this is a little weird, but we complete the nonblocking
2136             * connection by polling */
2137            poll_conn.fd = tcp_addr_data->socket;
2138            poll_conn.events = POLLOUT;
2139            ret = poll(&poll_conn, 1, 2);
2140            if ((ret < 0) || (poll_conn.revents & POLLERR))
2141            {
2142                tmp_errno = errno;
2143                gossip_lerr("Error: poll: %s\n", strerror(tmp_errno));
2144                return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2145            }
2146            if (poll_conn.revents & POLLOUT)
2147            {
2148                tcp_addr_data->not_connected = 0;
2149            }
2150        }
2151        /* return.  the caller should check the "not_connected" flag to
2152         * see if the socket is usable yet. */
2153        return (0);
2154    }
2155   
2156    bmi_set_sock_buffers(tcp_addr_data->socket);
2157
2158    /* at this point there is no socket.  try to build it */
2159    if (tcp_addr_data->port < 1)
2160    {
2161        return (bmi_tcp_errno_to_pvfs(-EINVAL));
2162    }
2163
2164    /* make a socket */
2165    if ((tcp_addr_data->socket = BMI_sockio_new_sock()) < 0)
2166    {
2167        tmp_errno = errno;
2168        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2169    }
2170
2171    /* set it to non-blocking operation */
2172    oldfl = fcntl(tcp_addr_data->socket, F_GETFL, 0);
2173    if (!(oldfl & O_NONBLOCK))
2174    {
2175        fcntl(tcp_addr_data->socket, F_SETFL, oldfl | O_NONBLOCK);
2176    }
2177
2178#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
2179    /* make sure if we need to bind or not to some local port ranges */
2180    tcp_enable_trusted(tcp_addr_data);
2181#endif
2182
2183    /* turn off Nagle's algorithm */
2184    if (BMI_sockio_set_tcpopt(tcp_addr_data->socket, TCP_NODELAY, 1) < 0)
2185    {
2186        tmp_errno = errno;
2187        gossip_lerr("Error: failed to set TCP_NODELAY option.\n");
2188        close(tcp_addr_data->socket);
2189        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2190    }
2191
2192       bmi_set_sock_buffers(tcp_addr_data->socket);
2193
2194    if (tcp_addr_data->hostname)
2195    {
2196        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
2197                      "Connect: socket=%d, hostname=%s, port=%d\n",
2198                      tcp_addr_data->socket, tcp_addr_data->hostname,
2199                      tcp_addr_data->port);
2200        ret = BMI_sockio_connect_sock(tcp_addr_data->socket,
2201                      tcp_addr_data->hostname,
2202                      tcp_addr_data->port);
2203    }
2204    else
2205    {
2206        return (bmi_tcp_errno_to_pvfs(-EINVAL));
2207    }
2208
2209    if (ret < 0)
2210    {
2211        if (ret == -EINPROGRESS)
2212        {
2213            tcp_addr_data->not_connected = 1;
2214            /* this will have to be connected later with a poll */
2215        }
2216        else
2217        {
2218            /* NOTE: BMI_sockio_connect_sock returns a PVFS error */
2219            char buff[300];
2220
2221            snprintf(buff, 300, "Error: BMI_sockio_connect_sock: (%s):",
2222                     tcp_addr_data->hostname);
2223
2224            PVFS_perror_gossip(buff, ret);
2225            return (ret);
2226        }
2227    }
2228
2229    return (0);
2230}
2231
2232
2233/* enqueue_operation()
2234 *
2235 * creates a new operation based on the arguments to the function.  It
2236 * then makes sure that the address is added to the socket collection,
2237 * and the operation is added to the appropriate operation queue.
2238 *
2239 * Damn, what a big prototype!
2240 *
2241 * returns 0 on success, -errno on failure
2242 */
2243static int enqueue_operation(op_list_p target_list,
2244                             enum bmi_op_type send_recv,
2245                             bmi_method_addr_p map,
2246                             void *const *buffer_list,
2247                             const bmi_size_t *size_list,
2248                             int list_count,
2249                             bmi_size_t amt_complete,
2250                             bmi_size_t env_amt_complete,
2251                             bmi_op_id_t * id,
2252                             int tcp_op_state,
2253                             struct tcp_msg_header header,
2254                             void *user_ptr,
2255                             bmi_size_t actual_size,
2256                             bmi_size_t expected_size,
2257                             bmi_context_id context_id,
2258                             int32_t eid)
2259{
2260    method_op_p new_method_op = NULL;
2261    struct tcp_op *tcp_op_data = NULL;
2262    struct tcp_addr* tcp_addr_data = NULL;
2263    int i;
2264
2265    /* allocate the operation structure */
2266    new_method_op = alloc_tcp_method_op();
2267    if (!new_method_op)
2268    {
2269        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
2270    }
2271
2272    *id = new_method_op->op_id;
2273    new_method_op->event_id = eid;
2274
2275    /* set the fields */
2276    new_method_op->send_recv = send_recv;
2277    new_method_op->addr = map;
2278    new_method_op->user_ptr = user_ptr;
2279    /* this is on purpose; we want to use the buffer_list all of
2280     * the time, no special case for one contig buffer
2281     */
2282    new_method_op->buffer = NULL;
2283    new_method_op->actual_size = actual_size;
2284    new_method_op->expected_size = expected_size;
2285    new_method_op->send_recv = send_recv;
2286    new_method_op->amt_complete = amt_complete;
2287    new_method_op->env_amt_complete = env_amt_complete;
2288    new_method_op->msg_tag = header.tag;
2289    new_method_op->mode = header.mode;
2290    new_method_op->list_count = list_count;
2291    new_method_op->context_id = context_id;
2292
2293    /* set our current position in list processing */
2294    i=0;
2295    new_method_op->list_index = 0;
2296    new_method_op->cur_index_complete = 0;
2297    while(amt_complete > 0)
2298    {
2299        if(amt_complete >= size_list[i])
2300        {
2301            amt_complete -= size_list[i];
2302            new_method_op->list_index++;
2303            i++;
2304        }
2305        else
2306        {
2307            new_method_op->cur_index_complete = amt_complete;
2308            amt_complete = 0;
2309        }
2310    }
2311
2312    tcp_op_data = new_method_op->method_data;
2313    tcp_op_data->tcp_op_state = tcp_op_state;
2314    tcp_op_data->env = header;
2315
2316    /* if there is only one item in the list, then keep the list stored
2317     * in the op structure.  This allows us to use the same code for send
2318     * and recv as we use for send_list and recv_list, without having to
2319     * malloc lists for those special cases
2320     */
2321    if (list_count == 1)
2322    {
2323        new_method_op->buffer_list = &tcp_op_data->buffer_list_stub;
2324        new_method_op->size_list = &tcp_op_data->size_list_stub;
2325        ((void**)new_method_op->buffer_list)[0] = buffer_list[0];
2326        ((bmi_size_t*)new_method_op->size_list)[0] = size_list[0];
2327    }
2328    else
2329    {
2330        new_method_op->size_list = size_list;
2331        new_method_op->buffer_list = buffer_list;
2332    }
2333
2334    tcp_addr_data = map->method_data;
2335
2336    if(tcp_addr_data->addr_error)
2337    {
2338        /* server should always fail here, client should let receives queue
2339         * as if nothing were wrong
2340         */
2341        if(tcp_addr_data->dont_reconnect || send_recv == BMI_SEND)
2342        {
2343            gossip_debug(GOSSIP_BMI_DEBUG_TCP,
2344                       "Warning: BMI communication attempted on an "
2345                       "address in failure mode.\n");
2346            new_method_op->error_code = tcp_addr_data->addr_error;
2347            op_list_add(op_list_array[new_method_op->context_id],
2348                        new_method_op);
2349            return(tcp_addr_data->addr_error);
2350        }
2351    }
2352
2353#if 0
2354    if(tcp_addr_data->addr_error)
2355    {
2356        /* this address is bad, don't try to do anything with it */
2357        gossip_err("Warning: BMI communication attempted on an "
2358                   "address in failure mode.\n");
2359
2360        new_method_op->error_code = tcp_addr_data->addr_error;
2361        op_list_add(op_list_array[new_method_op->context_id],
2362                    new_method_op);
2363        return(tcp_addr_data->addr_error);
2364    }
2365#endif
2366
2367    /* add the socket to poll on */
2368    BMI_socket_collection_add(tcp_socket_collection_p, map);
2369    if(send_recv == BMI_SEND)
2370    {
2371        BMI_socket_collection_add_write_bit(tcp_socket_collection_p, map);
2372    }
2373
2374    /* keep up with the operation */
2375    op_list_add(target_list, new_method_op);
2376
2377    return (0);
2378}
2379
2380
2381/* tcp_post_recv_generic()
2382 *
2383 * does the real work of posting an operation - works for both
2384 * eager and rendezvous messages
2385 *
2386 * returns 0 on success that requires later poll, returns 1 on instant
2387 * completion, -errno on failure
2388 */
2389static int tcp_post_recv_generic(bmi_op_id_t * id,
2390                                 bmi_method_addr_p src,
2391                                 void *const *buffer_list,
2392                                 const bmi_size_t *size_list,
2393                                 int list_count,
2394                                 bmi_size_t expected_size,
2395                                 bmi_size_t * actual_size,
2396                                 enum bmi_buffer_type buffer_type,
2397                                 bmi_msg_tag_t tag,
2398                                 void *user_ptr,
2399                                 bmi_context_id context_id,
2400                                 PVFS_hint hints)
2401{
2402    method_op_p query_op = NULL;
2403    int ret = -1;
2404    struct tcp_addr *tcp_addr_data = NULL;
2405    struct tcp_op *tcp_op_data = NULL;
2406    struct tcp_msg_header bogus_header;
2407    struct op_list_search_key key;
2408    bmi_size_t copy_size = 0;
2409    bmi_size_t total_copied = 0;
2410    int i;
2411    PINT_event_id eid = 0;
2412
2413    PINT_EVENT_START(
2414        bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, &eid,
2415        PINT_HINT_GET_CLIENT_ID(hints),
2416        PINT_HINT_GET_REQUEST_ID(hints),
2417        PINT_HINT_GET_RANK(hints),
2418        PINT_HINT_GET_HANDLE(hints),
2419        PINT_HINT_GET_OP_ID(hints),
2420        expected_size);
2421
2422    tcp_addr_data = src->method_data;
2423
2424    /* short out immediately if the address is bad and we have no way to
2425     * reconnect
2426     */
2427    if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
2428    {
2429        gossip_debug(
2430            GOSSIP_BMI_DEBUG_TCP,
2431            "Warning: BMI communication attempted "
2432            "on an address in failure mode.\n");
2433        return(tcp_addr_data->addr_error);
2434    }
2435
2436    /* lets make sure that the message hasn't already been fully
2437     * buffered in eager mode before doing anything else
2438     */
2439    memset(&key, 0, sizeof(struct op_list_search_key));
2440    key.method_addr = src;
2441    key.method_addr_yes = 1;
2442    key.msg_tag = tag;
2443    key.msg_tag_yes = 1;
2444
2445    query_op =
2446        op_list_search(op_list_array[IND_RECV_EAGER_DONE_BUFFERING], &key);
2447    if (query_op)
2448    {
2449        /* make sure it isn't too big */
2450        if (query_op->actual_size > expected_size)
2451        {
2452            gossip_err("Error: message ordering violation;\n");
2453            gossip_err("Error: message too large for next buffer.\n");
2454            return (bmi_tcp_errno_to_pvfs(-EPROTO));
2455        }
2456
2457        /* whoohoo- it is already done! */
2458        /* copy buffer out to list segments; handle short case */
2459        for (i = 0; i < list_count; i++)
2460        {
2461            copy_size = size_list[i];
2462            if (copy_size + total_copied > query_op->actual_size)
2463            {
2464                copy_size = query_op->actual_size - total_copied;
2465            }
2466            memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
2467                                             total_copied), copy_size);
2468            total_copied += copy_size;
2469            if (total_copied == query_op->actual_size)
2470            {
2471                break;
2472            }
2473        }
2474        /* copy out to correct memory regions */
2475        (*actual_size) = query_op->actual_size;
2476        free(query_op->buffer);
2477        *id = 0;
2478        op_list_remove(query_op);
2479        dealloc_tcp_method_op(query_op);
2480        PINT_EVENT_END(bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid, 0,
2481                       *actual_size);
2482
2483        return (1);
2484    }
2485
2486    /* look for a message that is already being received */
2487    query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
2488    if (query_op)
2489    {
2490        tcp_op_data = query_op->method_data;
2491    }
2492
2493    /* see if it is being buffered into a temporary memory region */
2494    if (query_op && tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
2495    {
2496        /* make sure it isn't too big */
2497        if (query_op->actual_size > expected_size)
2498        {
2499            gossip_err("Error: message ordering violation;\n");
2500            gossip_err("Error: message too large for next buffer.\n");
2501            return (bmi_tcp_errno_to_pvfs(-EPROTO));
2502        }
2503
2504        /* copy what we have so far into the correct buffers */
2505        total_copied = 0;
2506        for (i = 0; i < list_count; i++)
2507        {
2508            copy_size = size_list[i];
2509            if (copy_size + total_copied > query_op->amt_complete)
2510            {
2511                copy_size = query_op->amt_complete - total_copied;
2512            }
2513            if (copy_size > 0)
2514            {
2515                memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
2516                                                 total_copied), copy_size);
2517            }
2518            total_copied += copy_size;
2519            if (total_copied == query_op->amt_complete)
2520            {
2521                query_op->list_index = i;
2522                query_op->cur_index_complete = copy_size;
2523                break;
2524            }
2525        }
2526
2527        /* see if we ended on a buffer boundary */
2528        if (query_op->cur_index_complete ==
2529            query_op->size_list[query_op->list_index])
2530        {
2531            query_op->list_index++;
2532            query_op->cur_index_complete = 0;
2533        }
2534
2535        /* release the old buffer */
2536        if (query_op->buffer)
2537        {
2538            free(query_op->buffer);
2539        }
2540
2541        *id = query_op->op_id;
2542        tcp_op_data = query_op->method_data;
2543        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
2544
2545        query_op->list_count = list_count;
2546        query_op->user_ptr = user_ptr;
2547        query_op->context_id = context_id;
2548        /* if there is only one item in the list, then keep the list stored
2549         * in the op structure.  This allows us to use the same code for send
2550         * and recv as we use for send_list and recv_list, without having to
2551         * malloc lists for those special cases
2552         */
2553        if (list_count == 1)
2554        {
2555            query_op->buffer_list = &tcp_op_data->buffer_list_stub;
2556            query_op->size_list = &tcp_op_data->size_list_stub;
2557            ((void **)query_op->buffer_list)[0] = buffer_list[0];
2558            ((bmi_size_t *)query_op->size_list)[0] = size_list[0];
2559        }
2560        else
2561        {
2562            query_op->buffer_list = buffer_list;
2563            query_op->size_list = size_list;
2564        }
2565
2566        if (query_op->amt_complete < query_op->actual_size)
2567        {
2568            /* try to recv some more data */
2569            tcp_addr_data = query_op->addr->method_data;
2570            ret = payload_progress(tcp_addr_data->socket,
2571                                   query_op->buffer_list,
2572                                   query_op->size_list,
2573                                   query_op->list_count,
2574                                   query_op->actual_size,
2575                                   &(query_op->list_index),
2576                                   &(query_op->cur_index_complete),
2577                                   BMI_RECV,
2578                                   NULL,
2579                                   0);
2580            if (ret < 0)
2581            {
2582                PVFS_perror_gossip("Error: payload_progress", ret);
2583                /* payload_progress() returns BMI error codes */
2584                tcp_forget_addr(query_op->addr, 0, ret);
2585                return (ret);
2586            }
2587
2588            query_op->amt_complete += ret;
2589        }
2590        assert(query_op->amt_complete <= query_op->actual_size);
2591        if (query_op->amt_complete == query_op->actual_size)
2592        {
2593            /* we are done */
2594            op_list_remove(query_op);
2595            *id = 0;
2596            (*actual_size) = query_op->actual_size;
2597            dealloc_tcp_method_op(query_op);
2598            PINT_EVENT_END(
2599                bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid,
2600                0, *actual_size);
2601
2602            return (1);
2603        }
2604        else
2605        {
2606            /* there is still more work to do */
2607            tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
2608            return (0);
2609        }
2610    }
2611
2612    /* NOTE: if the message was in flight, but not buffering, then
2613     * that means that it has already matched an earlier receive
2614     * post or else is an unexpected message that doesn't require a
2615     * matching receive post - at any rate it shouldn't be handled
2616     * here
2617     */
2618
2619    /* if we hit this point we must enqueue */
2620    if (expected_size <= TCP_MODE_EAGER_LIMIT)
2621    {
2622        bogus_header.mode = TCP_MODE_EAGER;
2623    }
2624    else
2625    {
2626        bogus_header.mode = TCP_MODE_REND;
2627    }
2628    bogus_header.tag = tag;
2629    ret = enqueue_operation(op_list_array[IND_RECV],
2630                            BMI_RECV, src, buffer_list, size_list,
2631                            list_count, 0, 0, id, BMI_TCP_INPROGRESS,
2632                            bogus_header, user_ptr, 0,
2633                            expected_size, context_id, eid);
2634    /* just for safety; this field isn't valid to the caller anymore */
2635    (*actual_size) = 0;
2636    /* TODO: figure out why this causes deadlocks; observable in 2
2637     * scenarios:
2638     * - pvfs2-client-core with threaded library and nptl
2639     * - pvfs2-server threaded with nptl sending messages to itself
2640     */
2641#if 0
2642    if (ret >= 0)
2643    {
2644        /* go ahead and try to do some work while we are in this
2645         * function since we appear to be backlogged.  Make sure that
2646         * we do not wait in the poll, however.
2647         */
2648        ret = tcp_do_work(0);
2649    }
2650#endif
2651    return (ret);
2652}
2653
2654
2655/* tcp_cleanse_addr()
2656 *
2657 * finds all active operations matching the given address, places them
2658 * in an error state, and moves them to the completed queue.
2659 *
2660 * NOTE: this function does not shut down the address.  That should be
2661 * handled separately
2662 *
2663 * returns 0 on success, -errno on failure
2664 */
2665static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code)
2666{
2667    int i = 0;
2668    struct op_list_search_key key;
2669    method_op_p query_op = NULL;
2670
2671    memset(&key, 0, sizeof(struct op_list_search_key));
2672    key.method_addr = map;
2673    key.method_addr_yes = 1;
2674
2675    /* NOTE: we know the unexpected completed queue is the last index! */
2676    for (i = 0; i < (NUM_INDICES - 1); i++)
2677    {
2678        if (op_list_array[i])
2679        {
2680            while ((query_op = op_list_search(op_list_array[i], &key)))
2681            {
2682                op_list_remove(query_op);
2683                query_op->error_code = error_code;
2684                if (query_op->mode == TCP_MODE_UNEXP && query_op->send_recv
2685                    == BMI_RECV)
2686                {
2687                    op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP],
2688                                query_op);
2689                }
2690                else
2691                {
2692                    ((struct tcp_op*)(query_op->method_data))->tcp_op_state =
2693                        BMI_TCP_COMPLETE;
2694                    op_list_add(completion_array[query_op->context_id], query_op);
2695                }
2696            }
2697        }
2698    }
2699
2700    return (0);
2701}
2702
2703
2704/* tcp_shutdown_addr()
2705 *
2706 * closes connections associated with a tcp method address
2707 *
2708 * returns 0 on success, -errno on failure
2709 */
2710static int tcp_shutdown_addr(bmi_method_addr_p map)
2711{
2712
2713    struct tcp_addr *tcp_addr_data = map->method_data;
2714    if (tcp_addr_data->socket > -1)
2715    {
2716        close(tcp_addr_data->socket);
2717    }
2718    tcp_addr_data->socket = -1;
2719    tcp_addr_data->not_connected = 1;
2720
2721    return (0);
2722}
2723
2724
2725/* tcp_do_work()
2726 *
2727 * this is the function that actually does communication work during
2728 * BMI_tcp_testXXX and BMI_tcp_waitXXX functions.  The amount of work
2729 * that it does is tunable.
2730 *
2731 * returns 0 on success, -errno on failure.
2732 */
2733static int tcp_do_work(int max_idle_time)
2734{
2735    int ret = -1;
2736    bmi_method_addr_p addr_array[TCP_WORK_METRIC];
2737    int status_array[TCP_WORK_METRIC];
2738    int socket_count = 0;
2739    int i = 0;
2740    int stall_flag = 0;
2741    int busy_flag = 1;
2742    struct timespec req;
2743    struct tcp_addr* tcp_addr_data = NULL;
2744    struct timespec wait_time;
2745    struct timeval start;
2746
2747    if(sc_test_busy)
2748    {
2749        /* another thread is already polling or working on sockets */
2750        if(max_idle_time == 0)
2751        {
2752            /* we don't want to spend time waiting on it; return
2753             * immediately.
2754             */
2755            return(0);
2756        }
2757
2758        /* Sleep until working thread thread signals that it has finished
2759         * its work and then return.  No need for this thread to poll;
2760         * the other thread may have already finished what we wanted.
2761         * This condition wait is used strictly as a best effort to
2762         * prevent busy spin.  We'll sort out the results later.
2763         */
2764        gettimeofday(&start, NULL);
2765        wait_time.tv_sec = start.tv_sec + max_idle_time / 1000;
2766        wait_time.tv_nsec = (start.tv_usec + ((max_idle_time % 1000)*1000))*1000;
2767        if (wait_time.tv_nsec > 1000000000)
2768        {
2769            wait_time.tv_nsec = wait_time.tv_nsec - 1000000000;
2770            wait_time.tv_sec++;
2771        }
2772        gen_cond_timedwait(&interface_cond, &interface_mutex, &wait_time);
2773        return(0);
2774    }
2775
2776    /* this thread has gained control of the polling.  */
2777    sc_test_busy = 1;
2778    gen_mutex_unlock(&interface_mutex);
2779
2780    /* our turn to look at the socket collection */
2781    ret = BMI_socket_collection_testglobal(tcp_socket_collection_p,
2782                                       TCP_WORK_METRIC, &socket_count,
2783                                       addr_array, status_array,
2784                                       max_idle_time);
2785
2786    gen_mutex_lock(&interface_mutex);
2787    sc_test_busy = 0;
2788
2789    if (ret < 0)
2790    {
2791        /* wake up anyone else who might have been waiting */
2792        gen_cond_broadcast(&interface_cond);
2793        PVFS_perror_gossip("Error: socket collection:", ret);
2794        /* BMI_socket_collection_testglobal() returns BMI error code */
2795        return (ret);
2796    }
2797
2798    if(socket_count == 0)
2799        busy_flag = 0;
2800
2801    /* do different kinds of work depending on results */
2802    for (i = 0; i < socket_count; i++)
2803    {
2804        tcp_addr_data = addr_array[i]->method_data;
2805        /* skip working on addresses in failure mode */
2806        if(tcp_addr_data->addr_error)
2807        {
2808            /* addr_error field is in BMI error code format */
2809            tcp_forget_addr(addr_array[i], 0, tcp_addr_data->addr_error);
2810            continue;
2811        }
2812
2813        if (status_array[i] & SC_ERROR_BIT)
2814        {
2815            ret = tcp_do_work_error(addr_array[i]);
2816            if (ret < 0)
2817            {
2818                PVFS_perror_gossip("Warning: BMI error handling failure, continuing", ret);
2819            }
2820        }
2821        else
2822        {
2823            if (status_array[i] & SC_WRITE_BIT)
2824            {
2825                ret = tcp_do_work_send(addr_array[i], &stall_flag);
2826                if (ret < 0)
2827                {
2828                    PVFS_perror_gossip("Warning: BMI send error, continuing", ret);
2829                }
2830                if(!stall_flag)
2831                    busy_flag = 0;
2832            }
2833            if (status_array[i] & SC_READ_BIT)
2834            {
2835                ret = tcp_do_work_recv(addr_array[i], &stall_flag);
2836                if (ret < 0)
2837                {
2838                    PVFS_perror_gossip("Warning: BMI recv error, continuing", ret);
2839                }
2840                if(!stall_flag)
2841                    busy_flag = 0;
2842            }
2843        }
2844    }
2845
2846    /* IMPORTANT NOTE: if we have set the following flag, then it indicates that
2847     * poll() is finding data on our sockets, yet we are not able to move
2848     * any of it right now.  This means that the sockets are backlogged, and
2849     * BMI is in danger of busy spinning during test functions.  Let's sleep
2850     * for a millisecond here in hopes of letting the rest of the system
2851     * catch up somehow (either by clearing a backlog in another I/O
2852     * component, or by posting more matching BMI recieve operations)
2853     */
2854    if(busy_flag)
2855    {
2856        req.tv_sec = 0;
2857        req.tv_nsec = 1000;
2858        gen_mutex_unlock(&interface_mutex);
2859        nanosleep(&req, NULL);
2860        gen_mutex_lock(&interface_mutex);
2861    }
2862
2863    /* wake up anyone else who might have been waiting */
2864    gen_cond_broadcast(&interface_cond);
2865    return (0);
2866}
2867
2868
2869/* tcp_do_work_send()
2870 *
2871 * does work on a TCP address that is ready to send data.
2872 *
2873 * returns 0 on success, -errno on failure
2874 */
2875static int tcp_do_work_send(bmi_method_addr_p map, int* stall_flag)
2876{
2877    method_op_p active_method_op = NULL;
2878    struct op_list_search_key key;
2879    int blocked_flag = 0;
2880    int ret = 0;
2881    int tmp_stall_flag;
2882
2883    *stall_flag = 1;
2884
2885    while (blocked_flag == 0 && ret == 0)
2886    {
2887        /* what we want to do here is find the first operation in the send
2888         * queue for this address.
2889         */
2890        memset(&key, 0, sizeof(struct op_list_search_key));
2891        key.method_addr = map;
2892        key.method_addr_yes = 1;
2893        active_method_op = op_list_search(op_list_array[IND_SEND], &key);
2894        if (!active_method_op)
2895        {
2896            /* ran out of queued sends to work on */
2897            return (0);
2898        }
2899
2900        ret = work_on_send_op(active_method_op, &blocked_flag, &tmp_stall_flag);
2901        if(!tmp_stall_flag)
2902            *stall_flag = 0;
2903    }
2904
2905    return (ret);
2906}
2907
2908
2909/* handle_new_connection()
2910 *
2911 * this function should be called only on special tcp method addresses
2912 * that represent local server ports.  It will attempt to accept a new
2913 * connection and create a new method address for the remote host.
2914 *
2915 * side effect: destroys the temporary method_address that is passed in
2916 * to it.
2917 *
2918 * returns 0 on success, -errno on failure
2919 */
2920static int handle_new_connection(bmi_method_addr_p map)
2921{
2922    struct tcp_addr *tcp_addr_data = NULL;
2923    int accepted_socket = -1;
2924    bmi_method_addr_p new_addr = NULL;
2925    int ret = -1;
2926    char* tmp_peer = NULL;
2927
2928    ret = tcp_accept_init(&accepted_socket, &tmp_peer);
2929    if (ret < 0)
2930    {
2931        return (ret);
2932    }
2933    if (accepted_socket < 0)
2934    {
2935        /* guess it wasn't ready after all */
2936        return (0);
2937    }
2938
2939    /* ok, we have a new socket.  what now?  Probably simplest
2940     * thing to do is to create a new method_addr, add it to the
2941     * socket collection, and return.  It will get caught the next
2942     * time around */
2943    new_addr = alloc_tcp_method_addr();
2944    if (!new_addr)
2945    {
2946        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
2947    }
2948    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
2949                  "Assigning socket %d to new method addr.\n",
2950                  accepted_socket);
2951    tcp_addr_data = new_addr->method_data;
2952    tcp_addr_data->socket = accepted_socket;
2953    tcp_addr_data->peer = tmp_peer;
2954    tcp_addr_data->peer_type = BMI_TCP_PEER_IP;
2955
2956    /* set a flag to make sure that we never try to reconnect this address
2957     * in the future
2958     */
2959    tcp_addr_data->dont_reconnect = 1;
2960    /* register this address with the method control layer */
2961    tcp_addr_data->bmi_addr = bmi_method_addr_reg_callback(new_addr);
2962    if (ret < 0)
2963    {
2964        tcp_shutdown_addr(new_addr);
2965        dealloc_tcp_method_addr(new_addr);
2966        dealloc_tcp_method_addr(map);
2967        return (ret);
2968    }
2969    BMI_socket_collection_add(tcp_socket_collection_p, new_addr);
2970
2971    dealloc_tcp_method_addr(map);
2972    return (0);
2973
2974}
2975
2976
2977/* tcp_do_work_recv()
2978 *
2979 * does work on a TCP address that is ready to recv data.
2980 *
2981 * returns 0 on success, -errno on failure
2982 */
2983static int tcp_do_work_recv(bmi_method_addr_p map, int* stall_flag)
2984{
2985
2986    method_op_p active_method_op = NULL;
2987    int ret = -1;
2988    void *new_buffer = NULL;
2989    struct op_list_search_key key;
2990    struct tcp_msg_header new_header;
2991    struct tcp_addr *tcp_addr_data = map->method_data;
2992    struct tcp_op *tcp_op_data = NULL;
2993    int tmp_errno;
2994    int tmp;
2995    bmi_size_t old_amt_complete = 0;
2996    time_t current_time;
2997
2998    *stall_flag = 1;
2999
3000    /* figure out if this is a new connection */
3001    if (tcp_addr_data->server_port)
3002    {
3003        /* just try to accept connection- no work yet */
3004        *stall_flag = 0;
3005        return (handle_new_connection(map));
3006    }
3007
3008    /* look for a recv for this address that is already in flight */
3009    active_method_op = find_recv_inflight(map);
3010    /* see if we found one in progress... */
3011    if (active_method_op)
3012    {
3013        tcp_op_data = active_method_op->method_data;
3014        if (active_method_op->mode == TCP_MODE_REND &&
3015            tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
3016        {
3017            /* we must wait for recv post */
3018            return (0);
3019        }
3020        else
3021        {
3022            old_amt_complete = active_method_op->amt_complete;
3023            ret = work_on_recv_op(active_method_op, stall_flag);
3024            gossip_debug(GOSSIP_BMI_DEBUG_TCP, "actual_size=%d, "
3025                         "amt_complete=%d, old_amt_complete=%d\n",
3026                         (int)active_method_op->actual_size,
3027                         (int)active_method_op->amt_complete,
3028                         (int)old_amt_complete);
3029
3030            if ((ret == 0) &&
3031                (old_amt_complete == active_method_op->amt_complete) &&
3032                active_method_op->actual_size &&
3033                (active_method_op->amt_complete <
3034                 active_method_op->actual_size))
3035            {
3036                gossip_debug(
3037                    GOSSIP_BMI_DEBUG_TCP, "Warning: bmi_tcp unable "
3038                    "to recv any data reported by poll(). [1]\n");
3039
3040                if (tcp_addr_data->zero_read_limit++ ==
3041                    BMI_TCP_ZERO_READ_LIMIT)
3042                {
3043                    gossip_debug(GOSSIP_BMI_DEBUG_TCP,
3044                                 "...dropping connection.\n");
3045                    tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3046                }
3047            }
3048            else
3049            {
3050                tcp_addr_data->zero_read_limit = 0;
3051            }
3052            return(ret);
3053        }
3054    }
3055
3056    /* let's see if a the entire header is ready to be received.  If so
3057     * we will go ahead and pull it.  Otherwise, we will try again later.
3058     * It isn't worth the complication of reading only a partial message
3059     * header - we really want it atomically
3060     */
3061    ret = BMI_sockio_nbpeek(tcp_addr_data->socket,
3062                            new_header.enc_hdr, TCP_ENC_HDR_SIZE);
3063    if (ret < 0)
3064    {
3065        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-errno));
3066        return (0);
3067    }
3068
3069    if (ret == 0)
3070    {
3071        gossip_debug(
3072            GOSSIP_BMI_DEBUG_TCP, "Warning: bmi_tcp unable "
3073            "to recv any data reported by poll(). [2]\n");
3074
3075        if (tcp_addr_data->zero_read_limit++ ==
3076            BMI_TCP_ZERO_READ_LIMIT)
3077        {
3078            gossip_debug(GOSSIP_BMI_DEBUG_TCP,
3079                         "...dropping connection.\n");
3080            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3081        }
3082        return(0);
3083    }
3084    else
3085    {
3086        tcp_addr_data->zero_read_limit = 0;
3087    }
3088
3089    if (ret < TCP_ENC_HDR_SIZE)
3090    {
3091        current_time = time(NULL);
3092        if(!tcp_addr_data->short_header_timer)
3093        {
3094            tcp_addr_data->short_header_timer = current_time;
3095        }
3096        else if((current_time - tcp_addr_data->short_header_timer) >
3097            BMI_TCP_HEADER_WAIT_SECONDS)
3098        {
3099            gossip_err("Error: incomplete BMI TCP header after %d seconds, closing connection.\n",
3100                BMI_TCP_HEADER_WAIT_SECONDS);
3101            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3102            return (0);
3103        }
3104
3105        /* header not ready yet, but we will keep hoping */
3106        return (0);
3107    }
3108
3109    tcp_addr_data->short_header_timer = 0;
3110    *stall_flag = 0;
3111    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Reading header for new op.\n");
3112    ret = BMI_sockio_nbrecv(tcp_addr_data->socket,
3113                           new_header.enc_hdr, TCP_ENC_HDR_SIZE);
3114    if (ret < TCP_ENC_HDR_SIZE)
3115    {
3116        tmp_errno = errno;
3117        gossip_err("Error: BMI_sockio_nbrecv: %s\n", strerror(tmp_errno));
3118        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-tmp_errno));
3119        return (0);
3120    }
3121
3122    /* decode the header */
3123    BMI_TCP_DEC_HDR(new_header);
3124
3125    /* so we have the header. now what?  These are the possible
3126     * scenarios:
3127     * a) unexpected message
3128     * b) eager message for which a recv has been posted
3129     * c) eager message for which a recv has not been posted
3130     * d) rendezvous messsage for which a recv has been posted
3131     * e) rendezvous messsage for which a recv has not been posted
3132     * f) eager message for which a rend. recv has been posted
3133     */
3134
3135    /* check magic number of message */
3136    if(new_header.magic_nr != BMI_MAGIC_NR)
3137    {
3138        gossip_err("Error: bad magic in BMI TCP message.\n");
3139        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EBADMSG));
3140        return(0);
3141    }
3142
3143    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Received new message; mode: %d.\n",
3144                  (int) new_header.mode);
3145    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "tag: %d\n", (int) new_header.tag);
3146
3147    if (new_header.mode == TCP_MODE_UNEXP)
3148    {
3149        /* allocate the operation structure */
3150        active_method_op = alloc_tcp_method_op();
3151        if (!active_method_op)
3152        {
3153            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3154            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3155        }
3156        /* create data buffer */
3157        new_buffer = malloc(new_header.size);
3158        if (!new_buffer)
3159        {
3160            dealloc_tcp_method_op(active_method_op);
3161            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3162            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3163        }
3164
3165        /* set the fields */
3166        active_method_op->send_recv = BMI_RECV;
3167        active_method_op->addr = map;
3168        active_method_op->actual_size = new_header.size;
3169        active_method_op->expected_size = 0;
3170        active_method_op->amt_complete = 0;
3171        active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3172        active_method_op->msg_tag = new_header.tag;
3173        active_method_op->buffer = new_buffer;
3174        active_method_op->mode = TCP_MODE_UNEXP;
3175        active_method_op->buffer_list = &(active_method_op->buffer);
3176        active_method_op->size_list = &(active_method_op->actual_size);
3177        active_method_op->list_count = 1;
3178        tcp_op_data = active_method_op->method_data;
3179        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3180        tcp_op_data->env = new_header;
3181
3182        op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3183        /* grab some data if we can */
3184        return (work_on_recv_op(active_method_op, &tmp));
3185    }
3186
3187    memset(&key, 0, sizeof(struct op_list_search_key));
3188    key.method_addr = map;
3189    key.method_addr_yes = 1;
3190    key.msg_tag = new_header.tag;
3191    key.msg_tag_yes = 1;
3192
3193    /* look for a match within the posted operations */
3194    active_method_op = op_list_search(op_list_array[IND_RECV], &key);
3195
3196    if (active_method_op)
3197    {
3198        /* make sure it isn't too big */
3199        if (new_header.size > active_method_op->expected_size)
3200        {
3201            gossip_err("Error: message ordering violation;\n");
3202            gossip_err("Error: message too large for next buffer.\n");
3203            gossip_err("Error: incoming size: %ld, expected size: %ld\n",
3204                        (long) new_header.size,
3205                        (long) active_method_op->expected_size);
3206            /* TODO: return error here or do something else? */
3207            return (bmi_tcp_errno_to_pvfs(-EPROTO));
3208        }
3209
3210        /* we found a match.  go work on it and return */
3211        op_list_remove(active_method_op);
3212        active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3213        active_method_op->actual_size = new_header.size;
3214        op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3215        return (work_on_recv_op(active_method_op, &tmp));
3216    }
3217
3218    /* no match anywhere.  Start a new operation */
3219    /* allocate the operation structure */
3220    active_method_op = alloc_tcp_method_op();
3221    if (!active_method_op)
3222    {
3223        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3224        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3225    }
3226
3227    if (new_header.mode == TCP_MODE_EAGER)
3228    {
3229        /* create data buffer for eager messages */
3230        new_buffer = malloc(new_header.size);
3231        if (!new_buffer)
3232        {
3233            dealloc_tcp_method_op(active_method_op);
3234            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3235            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3236        }
3237    }
3238    else
3239    {
3240        new_buffer = NULL;
3241    }
3242
3243    /* set the fields */
3244    active_method_op->send_recv = BMI_RECV;
3245    active_method_op->addr = map;
3246    active_method_op->actual_size = new_header.size;
3247    active_method_op->expected_size = 0;
3248    active_method_op->amt_complete = 0;
3249    active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3250    active_method_op->msg_tag = new_header.tag;
3251    active_method_op->buffer = new_buffer;
3252    active_method_op->mode = new_header.mode;
3253    active_method_op->buffer_list = &(active_method_op->buffer);
3254    active_method_op->size_list = &(active_method_op->actual_size);
3255    active_method_op->list_count = 1;
3256    tcp_op_data = active_method_op->method_data;
3257    tcp_op_data->tcp_op_state = BMI_TCP_BUFFERING;
3258    tcp_op_data->env = new_header;
3259
3260    op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3261
3262    /* grab some data if we can */
3263    if (new_header.mode == TCP_MODE_EAGER)
3264    {
3265        return (work_on_recv_op(active_method_op, &tmp));
3266    }
3267
3268    return (0);
3269}
3270
3271
3272/*
3273 * work_on_send_op()
3274 *
3275 * used to perform work on a send operation.  this is called by the poll
3276 * function.
3277 *
3278 * sets blocked_flag if no more work can be done on socket without
3279 * blocking
3280 * returns 0 on success, -errno on failure.
3281 */
3282static int work_on_send_op(method_op_p my_method_op,
3283                           int *blocked_flag, int* stall_flag)
3284{
3285    int ret = -1;
3286    struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
3287    struct tcp_op *tcp_op_data = my_method_op->method_data;
3288
3289    *blocked_flag = 1;
3290    *stall_flag = 0;
3291
3292    /* make sure that the connection is done before we continue */
3293    if (tcp_addr_data->not_connected)
3294    {
3295        ret = tcp_sock_init(my_method_op->addr);
3296        if (ret < 0)
3297        {
3298            PVFS_perror_gossip("Error: socket failed to init", ret);
3299            /* tcp_sock_init() returns BMI error code */
3300            tcp_forget_addr(my_method_op->addr, 0, ret);
3301            return (0);
3302        }
3303        if (tcp_addr_data->not_connected)
3304        {
3305            /* try again later- still could not connect */
3306            tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3307            return (0);
3308        }
3309    }
3310
3311    ret = payload_progress(tcp_addr_data->socket,
3312        my_method_op->buffer_list,
3313        my_method_op->size_list,
3314        my_method_op->list_count,
3315        my_method_op->actual_size,
3316        &(my_method_op->list_index),
3317        &(my_method_op->cur_index_complete),
3318        BMI_SEND,
3319        tcp_op_data->env.enc_hdr,
3320        &my_method_op->env_amt_complete);
3321    if (ret < 0)
3322    {
3323        PVFS_perror_gossip("Error: payload_progress", ret);
3324        /* payload_progress() returns BMI error codes */
3325        tcp_forget_addr(my_method_op->addr, 0, ret);
3326        return (0);
3327    }
3328
3329    if(ret == 0)
3330        *stall_flag = 1;
3331
3332    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
3333    my_method_op->amt_complete += ret;
3334    assert(my_method_op->amt_complete <= my_method_op->actual_size);
3335
3336    if (my_method_op->amt_complete == my_method_op->actual_size && my_method_op->env_amt_complete == TCP_ENC_HDR_SIZE)
3337    {
3338        /* we are done */
3339        my_method_op->error_code = 0;
3340        BMI_socket_collection_remove_write_bit(tcp_socket_collection_p,
3341                                           my_method_op->addr);
3342        op_list_remove(my_method_op);
3343        ((struct tcp_op*)(my_method_op->method_data))->tcp_op_state =
3344            BMI_TCP_COMPLETE;
3345        op_list_add(completion_array[my_method_op->context_id], my_method_op);
3346        *blocked_flag = 0;
3347    }
3348    else
3349    {
3350        /* there is still more work to do */
3351        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3352    }
3353
3354    return (0);
3355}
3356
3357
3358/*
3359 * work_on_recv_op()
3360 *
3361 * used to perform work on a recv operation.  this is called by the poll
3362 * function.
3363 * NOTE: this function assumes the method header has already been read.
3364 *
3365 * returns 0 on success, -errno on failure.
3366 */
3367static int work_on_recv_op(method_op_p my_method_op, int* stall_flag)
3368{
3369
3370    int ret = -1;
3371    struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
3372    struct tcp_op *tcp_op_data = my_method_op->method_data;
3373
3374    *stall_flag = 1;
3375
3376    if (my_method_op->actual_size != 0)
3377    {
3378        /* now let's try to recv some actual data */
3379        ret = payload_progress(tcp_addr_data->socket,
3380            my_method_op->buffer_list,
3381            my_method_op->size_list,
3382            my_method_op->list_count,
3383            my_method_op->actual_size,
3384            &(my_method_op->list_index),
3385            &(my_method_op->cur_index_complete),
3386            BMI_RECV,
3387            NULL,
3388            0);
3389        if (ret < 0)
3390        {
3391            PVFS_perror_gossip("Error: payload_progress", ret);
3392            /* payload_progress() returns BMI error codes */
3393            tcp_forget_addr(my_method_op->addr, 0, ret);
3394            return (0);
3395        }
3396    }
3397    else
3398    {
3399        ret = 0;
3400    }
3401
3402    if(ret > 0)
3403        *stall_flag = 0;
3404
3405    my_method_op->amt_complete += ret;
3406    assert(my_method_op->amt_complete <= my_method_op->actual_size);
3407
3408    if (my_method_op->amt_complete == my_method_op->actual_size)
3409    {
3410        /* we are done */
3411        op_list_remove(my_method_op);
3412        if (tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
3413        {
3414            /* queue up to wait on matching post recv */
3415            op_list_add(op_list_array[IND_RECV_EAGER_DONE_BUFFERING],
3416                        my_method_op);
3417        }
3418        else
3419        {
3420            my_method_op->error_code = 0;
3421            if (my_method_op->mode == TCP_MODE_UNEXP)
3422            {
3423                op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP],
3424                            my_method_op);
3425            }
3426            else
3427            {
3428                ((struct tcp_op*)(my_method_op->method_data))->tcp_op_state =
3429                    BMI_TCP_COMPLETE;
3430                op_list_add(completion_array[my_method_op->context_id], my_method_op);
3431            }
3432        }
3433    }
3434
3435    return (0);
3436}
3437
3438
3439/* tcp_do_work_error()
3440 *
3441 * handles a tcp address that has indicated an error during polling.
3442 *
3443 * returns 0 on success, -errno on failure
3444 */
3445static int tcp_do_work_error(bmi_method_addr_p map)
3446{
3447    struct tcp_addr *tcp_addr_data = NULL;
3448    int buf;
3449    int ret;
3450    int tmp_errno;
3451
3452    tcp_addr_data = map->method_data;
3453
3454    /* perform a read on the socket so that we can get a real errno */
3455    ret = read(tcp_addr_data->socket, &buf, sizeof(int));
3456    if (ret == 0)
3457        tmp_errno = EPIPE;  /* report other side closed socket with this */
3458    else
3459        tmp_errno = errno;
3460
3461    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Error: bmi_tcp: %s\n",
3462      strerror(tmp_errno));
3463
3464    if (tcp_addr_data->server_port)
3465    {
3466        /* Ignore this and hope it goes away... we don't want to lose
3467         * our local socket */
3468        dealloc_tcp_method_addr(map);
3469        gossip_lerr("Warning: error polling on server socket, continuing.\n");
3470        return (0);
3471    }
3472
3473    if(tmp_errno == 0)
3474        tmp_errno = EPROTO;
3475
3476    tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-tmp_errno));
3477
3478    return (0);
3479}
3480
3481#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
3482/*
3483 * tcp_enable_trusted()
3484 * Ideally, this function should look up the security configuration of
3485 * the server and determines
3486 * if it needs to bind to any specific port locally or not..
3487 * For now look at the FIXME below.
3488 */
3489static int tcp_enable_trusted(struct tcp_addr *tcp_addr_data)
3490{
3491    /*
3492     * FIXME:
3493     * For now, there is no way for us to check if a given
3494     * server is actually using port protection or not.
3495     * For now we unconditionally use a trusted port range
3496     * as long as USE_TRUSTED is #defined.
3497     *
3498     * Although most of the time we expect users
3499     * to be using a range of 0-1024, it is hard to keep probing
3500     * until one gets a port in the range specified.
3501     * Hence this is a temporary fix. we will see if this
3502     * requirement even needs to be met at all.
3503     */
3504    static unsigned short my_requested_port = 1023;
3505    unsigned short my_local_port = 0;
3506    struct sockaddr_in my_local_sockaddr;
3507    socklen_t len = sizeof(struct sockaddr_in);
3508    memset(&my_local_sockaddr, 0, sizeof(struct sockaddr_in));
3509
3510    /* setup for a fast restart to avoid bind addr in use errors */
3511    if (BMI_sockio_set_sockopt(tcp_addr_data->socket, SO_REUSEADDR, 1) < 0)
3512    {
3513        gossip_lerr("Could not set SO_REUSEADDR on local socket (port %hd)\n", my_local_port);
3514    }
3515    if (BMI_sockio_bind_sock(tcp_addr_data->socket, my_requested_port) < 0)
3516    {
3517        gossip_lerr("Could not bind to local port %hd: %s\n",
3518                my_requested_port, strerror(errno));
3519    }
3520    else {
3521        my_requested_port--;
3522    }
3523    my_local_sockaddr.sin_family = AF_INET;
3524    if (getsockname(tcp_addr_data->socket,
3525                (struct sockaddr *)&my_local_sockaddr, &len) == 0)
3526    {
3527        my_local_port = ntohs(my_local_sockaddr.sin_port);
3528    }
3529    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Bound locally to port: %hd\n", my_local_port);
3530    return 0;
3531}
3532
3533#endif
3534
3535#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
3536
3537static char *bad_errors[] = {
3538    "invalid network address",
3539    "invalid port",
3540    "invalid network address and port"
3541};
3542
3543/*
3544 * tcp_allow_trusted()
3545 * if trusted ports was enabled make sure
3546 * that we can accept a particular connection from a given
3547 * client
3548 */
3549static int tcp_allow_trusted(struct sockaddr_in *peer_sockaddr)
3550{
3551    char *peer_hostname = inet_ntoa(peer_sockaddr->sin_addr);
3552    unsigned short peer_port = ntohs(peer_sockaddr->sin_port);
3553    int   i, what_failed   = -1;
3554
3555    /* Don't refuse connects if there were any
3556     * parse errors or if it is not enabled in the config file
3557     */
3558    if (gtcp_allowed_connection->port_enforce == 0
3559            && gtcp_allowed_connection->network_enforce == 0)
3560    {
3561        return 0;
3562    }
3563    /* make sure that the client is within the allowed network */
3564    if (gtcp_allowed_connection->network_enforce == 1)
3565    {
3566        /* Always allow localhost to connect */
3567        if (ntohl(peer_sockaddr->sin_addr.s_addr) == INADDR_LOOPBACK)
3568        {
3569            goto port_check;
3570        }
3571        for (i = 0; i < gtcp_allowed_connection->network_count; i++)
3572        {
3573            /* check with all the masks */
3574            if ((peer_sockaddr->sin_addr.s_addr & gtcp_allowed_connection->netmask[i].s_addr)
3575                    != (gtcp_allowed_connection->network[i].s_addr & gtcp_allowed_connection->netmask[i].s_addr ))
3576            {
3577                continue;
3578            }
3579            else {
3580                goto port_check;
3581            }
3582        }
3583        /* not from a trusted network */
3584        what_failed = 0;
3585    }
3586port_check:
3587    /* make sure that the client port numbers are within specified limits */
3588    if (gtcp_allowed_connection->port_enforce == 1)
3589    {
3590        if (peer_port < gtcp_allowed_connection->ports[0]
3591                || peer_port > gtcp_allowed_connection->ports[1])
3592        {
3593            what_failed = (what_failed < 0) ? 1 : 2;
3594        }
3595    }
3596    /* okay, we are good to go */
3597    if (what_failed < 0)
3598    {
3599        return 0;
3600    }
3601    /* no good */
3602    gossip_err("Rejecting client %s on port %d: %s\n",
3603           peer_hostname, peer_port, bad_errors[what_failed]);
3604    return -1;
3605}
3606
3607#endif
3608
3609/*
3610 * tcp_accept_init()
3611 *
3612 * used to establish a connection from the server side.  Attempts an
3613 * accept call and provides the socket if it succeeds.
3614 *
3615 * returns 0 on success, -errno on failure.
3616 */
3617static int tcp_accept_init(int *socket, char** peer)
3618{
3619
3620    int ret = -1;
3621    int tmp_errno = 0;
3622    struct tcp_addr *tcp_addr_data = tcp_method_params.listen_addr->method_data;
3623    int oldfl = 0;
3624    struct sockaddr_in peer_sockaddr;
3625    int peer_sockaddr_size = sizeof(struct sockaddr_in);
3626    char* tmp_peer;
3627
3628    /* do we have a socket on this end yet? */
3629    if (tcp_addr_data->socket < 0)
3630    {
3631        ret = tcp_server_init();
3632        if (ret < 0)
3633        {
3634            return (ret);
3635        }
3636    }
3637
3638    *socket = accept(tcp_addr_data->socket, (struct sockaddr*)&peer_sockaddr,
3639              (socklen_t *)&peer_sockaddr_size);
3640
3641    if (*socket < 0)
3642    {
3643        if ((errno == EAGAIN) ||
3644            (errno == EWOULDBLOCK) ||
3645            (errno == ENETDOWN) ||
3646            (errno == EPROTO) ||
3647            (errno == ENOPROTOOPT) ||
3648            (errno == EHOSTDOWN) ||
3649            (errno == ENONET) ||
3650            (errno == EHOSTUNREACH) ||
3651            (errno == EOPNOTSUPP) ||
3652            (errno == ENETUNREACH) ||
3653            (errno == ENFILE) ||
3654            (errno == EMFILE))
3655        {
3656            /* try again later */
3657            if ((errno == ENFILE) || (errno == EMFILE))
3658            {
3659                gossip_err("Error: accept: %s (continuing)\n",strerror(errno));
3660                bmi_method_addr_drop_callback(BMI_tcp_method_name);
3661            }
3662            return (0);
3663        }
3664        else
3665        {
3666            gossip_err("Error: accept: %s\n", strerror(errno));
3667            return (bmi_tcp_errno_to_pvfs(-errno));
3668        }
3669    }
3670
3671#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
3672
3673    /* make sure that we are allowed to accept this connection */
3674    if (tcp_allow_trusted(&peer_sockaddr) < 0)
3675    {
3676        /* Force closure of the connection */
3677        close(*socket);
3678        return (bmi_tcp_errno_to_pvfs(-EACCES));
3679    }
3680
3681#endif
3682
3683    /* we accepted a new connection.  turn off Nagle's algorithm. */
3684    if (BMI_sockio_set_tcpopt(*socket, TCP_NODELAY, 1) < 0)
3685    {
3686        tmp_errno = errno;
3687        gossip_lerr("Error: failed to set TCP_NODELAY option.\n");
3688        close(*socket);
3689        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
3690    }
3691
3692    /* set it to non-blocking operation */
3693    oldfl = fcntl(*socket, F_GETFL, 0);
3694    if (!(oldfl & O_NONBLOCK))
3695    {
3696        fcntl(*socket, F_SETFL, oldfl | O_NONBLOCK);
3697    }
3698
3699    /* allocate ip address string */
3700    tmp_peer = inet_ntoa(peer_sockaddr.sin_addr);
3701    *peer = (char*)malloc(strlen(tmp_peer)+1);
3702    if(!(*peer))
3703    {
3704        close(*socket);
3705        return(bmi_tcp_errno_to_pvfs(-BMI_ENOMEM));
3706    }
3707    strcpy(*peer, tmp_peer);
3708
3709    return (0);
3710}
3711
3712
3713/* alloc_tcp_method_op()
3714 *
3715 * creates a new method op with defaults filled in for tcp.
3716 *
3717 * returns pointer to structure on success, NULL on failure
3718 */
3719static method_op_p alloc_tcp_method_op(void)
3720{
3721    method_op_p my_method_op = NULL;
3722
3723    my_method_op = bmi_alloc_method_op(sizeof(struct tcp_op));
3724
3725    /* we trust alloc_method_op to zero it out */
3726
3727    return (my_method_op);
3728}
3729
3730
3731/* dealloc_tcp_method_op()
3732 *
3733 * destroys an existing tcp method op, freeing segment lists if
3734 * needed
3735 *
3736 * no return value
3737 */
3738static void dealloc_tcp_method_op(method_op_p old_op)
3739{
3740    bmi_dealloc_method_op(old_op);
3741    return;
3742}
3743
3744/* tcp_post_send_generic()
3745 *
3746 * Submits send operations (low level).
3747 *
3748 * returns 0 on success that requires later poll, returns 1 on instant
3749 * completion, -errno on failure
3750 */
3751static int tcp_post_send_generic(bmi_op_id_t * id,
3752                                 bmi_method_addr_p dest,
3753                                 const void *const *buffer_list,
3754                                 const bmi_size_t *size_list,
3755                                 int list_count,
3756                                 enum bmi_buffer_type buffer_type,
3757                                 struct tcp_msg_header my_header,
3758                                 void *user_ptr,
3759                                 bmi_context_id context_id,
3760                                 PVFS_hint hints)
3761{
3762    struct tcp_addr *tcp_addr_data = dest->method_data;
3763    method_op_p query_op = NULL;
3764    int ret = -1;
3765    bmi_size_t total_size = 0;
3766    bmi_size_t amt_complete = 0;
3767    bmi_size_t env_amt_complete = 0;
3768    struct op_list_search_key key;
3769    int list_index = 0;
3770    bmi_size_t cur_index_complete = 0;
3771    PINT_event_id eid = 0;
3772
3773    if(PINT_EVENT_ENABLED)
3774    {
3775        int i = 0;
3776        for(; i < list_count; ++i)
3777        {
3778            total_size += size_list[i];
3779        }
3780    }
3781
3782    PINT_EVENT_START(
3783        bmi_tcp_send_event_id, bmi_tcp_pid, NULL, &eid,
3784        PINT_HINT_GET_CLIENT_ID(hints),
3785        PINT_HINT_GET_REQUEST_ID(hints),
3786        PINT_HINT_GET_RANK(hints),
3787        PINT_HINT_GET_HANDLE(hints),
3788        PINT_HINT_GET_OP_ID(hints),
3789        total_size);
3790
3791    /* Three things can happen here:
3792     * a) another op is already in queue for the address, so we just
3793     * queue up
3794     * b) we can send the whole message and return
3795     * c) we send part of the message and queue the rest
3796     */
3797
3798    /* NOTE: on the post_send side of an operation, it doesn't really
3799     * matter whether the op is going to be eager or rendezvous.  It is
3800     * handled the same way (except for how the header is filled in).
3801     * The difference is in the recv processing for TCP.
3802     */
3803
3804    /* NOTE: we also don't care what the buffer_type says, TCP could care
3805     * less what buffers it is using.
3806     */
3807
3808    /* encode the message header */
3809    BMI_TCP_ENC_HDR(my_header);
3810
3811    /* the first thing we must do is find out if another send is queued
3812     * up for this address so that we don't mess up our ordering.    */
3813    memset(&key, 0, sizeof(struct op_list_search_key));
3814    key.method_addr = dest;
3815    key.method_addr_yes = 1;
3816    query_op = op_list_search(op_list_array[IND_SEND], &key);
3817    if (query_op)
3818    {
3819        /* queue up operation */
3820        ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3821                                dest, (void **) buffer_list,
3822                                size_list, list_count, 0, 0,
3823                                id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3824                                my_header.size, 0,
3825                                context_id,
3826                                eid);
3827
3828        /* TODO: is this causing deadlocks?  See similar call in recv
3829         * path for another example.  This particular one seems to be an
3830         * issue under a heavy bonnie++ load that Neill has been
3831         * debugging.  Comment out for now to see if the problem goes
3832         * away.
3833         */
3834#if 0
3835        if (ret >= 0)
3836        {
3837            /* go ahead and try to do some work while we are in this
3838             * function since we appear to be backlogged.  Make sure that
3839             * we do not wait in the poll, however.
3840             */
3841            ret = tcp_do_work(0);
3842        }
3843#endif
3844        if (ret < 0)
3845        {
3846            gossip_err("Error: enqueue_operation() or tcp_do_work() returned: %d\n", ret);
3847        }
3848        return (ret);
3849    }
3850
3851    /* make sure the connection is established */
3852    ret = tcp_sock_init(dest);
3853    if (ret < 0)
3854    {
3855        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "tcp_sock_init() failure.\n");
3856        /* tcp_sock_init() returns BMI error code */
3857        tcp_forget_addr(dest, 0, ret);
3858        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, 0, ret);
3859        return (ret);
3860    }
3861
3862    tcp_addr_data = dest->method_data;
3863
3864#if 0
3865    /* TODO: this is a hack for testing! */
3866    /* disables immediate send completion... */
3867    ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3868                            dest, buffer_list, size_list, list_count, 0, 0,
3869                            id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3870                            my_header.size, 0,
3871                            context_id);
3872    return(ret);
3873#endif
3874
3875    if (tcp_addr_data->not_connected)
3876    {
3877        /* if the connection is not completed, queue up for later work */
3878        ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3879                                dest, (void **) buffer_list, size_list,
3880                                list_count, 0, 0,
3881                                id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3882                                my_header.size, 0,
3883                                context_id,
3884                                eid);
3885        if(ret < 0)
3886        {
3887            gossip_err("Error: enqueue_operation() returned: %d\n", ret);
3888        }
3889        return (ret);
3890    }
3891
3892    /* try to send some data */
3893    env_amt_complete = 0;
3894    ret = payload_progress(tcp_addr_data->socket,
3895        (void **) buffer_list,
3896        size_list, list_count, my_header.size, &list_index,
3897        &cur_index_complete, BMI_SEND, my_header.enc_hdr, &env_amt_complete);
3898    if (ret < 0)
3899    {
3900        PVFS_perror_gossip("Error: payload_progress", ret);
3901        /* payload_progress() returns BMI error codes */
3902        tcp_forget_addr(dest, 0, ret);
3903        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, eid, 0, ret);
3904        return (ret);
3905    }
3906
3907    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
3908    amt_complete = ret;
3909    assert(amt_complete <= my_header.size);
3910    if (amt_complete == my_header.size && env_amt_complete == TCP_ENC_HDR_SIZE)
3911    {
3912        /* we are already done */
3913        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid,
3914                       NULL, eid, 0, amt_complete);
3915        return (1);
3916    }
3917
3918    /* queue up the remainder */
3919    ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3920                            dest, (void **) buffer_list,
3921                            size_list, list_count,
3922                            amt_complete, env_amt_complete, id,
3923                            BMI_TCP_INPROGRESS, my_header, user_ptr,
3924                            my_header.size, 0, context_id, eid);
3925
3926    if(ret < 0)
3927    {
3928        gossip_err("Error: enqueue_operation() returned: %d\n", ret);
3929    }
3930    return (ret);
3931}
3932
3933
3934/* payload_progress()
3935 *
3936 * makes progress on sending/recving data payload portion of a message
3937 *
3938 * returns amount completed on success, -errno on failure
3939 */
3940static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
3941    size_list, int list_count, bmi_size_t total_size, int* list_index,
3942    bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
3943    char* enc_hdr, bmi_size_t* env_amt_complete)
3944{
3945    int i;
3946    int count = 0;
3947    int ret;
3948    int completed;
3949    /* used for finding the stopping point on short receives */
3950    int final_index = list_count-1;
3951    bmi_size_t final_size = size_list[list_count-1];
3952    bmi_size_t sum = 0;
3953    int vector_index = 0;
3954    int header_flag = 0;
3955    int tmp_env_done = 0;
3956
3957    if(send_recv == BMI_RECV)
3958    {
3959        /* find out if we should stop short in list processing */
3960        for(i=0; i<list_count; i++)
3961        {
3962            sum += size_list[i];
3963            if(sum >= total_size)
3964            {
3965                final_index = i;
3966                final_size = size_list[i] - (sum-total_size);
3967                break;
3968            }
3969        }
3970    }
3971
3972    assert(list_count > *list_index);
3973
3974    /* make sure we don't overrun our preallocated iovec array */
3975    if((list_count - (*list_index)) > BMI_TCP_IOV_COUNT)
3976    {
3977        list_count = (*list_index) + BMI_TCP_IOV_COUNT;
3978    }
3979
3980    /* do we need to send any of the header? */
3981    if(send_recv == BMI_SEND && *env_amt_complete < TCP_ENC_HDR_SIZE)
3982    {
3983        stat_io_vector[vector_index].iov_base = &enc_hdr[*env_amt_complete];
3984        stat_io_vector[vector_index].iov_len = TCP_ENC_HDR_SIZE - *env_amt_complete;
3985        count++;
3986        vector_index++;
3987        header_flag = 1;
3988    }
3989
3990    /* setup vector */
3991    stat_io_vector[vector_index].iov_base =
3992        (char*)buffer_list[*list_index] + *current_index_complete;
3993    count++;
3994    if(final_index == 0)
3995    {
3996        stat_io_vector[vector_index].iov_len = final_size - *current_index_complete;
3997    }
3998    else
3999    {
4000        stat_io_vector[vector_index].iov_len =
4001            size_list[*list_index] - *current_index_complete;
4002        for(i = (*list_index + 1); i < list_count; i++)
4003        {
4004            vector_index++;
4005            count++;
4006            stat_io_vector[vector_index].iov_base = buffer_list[i];
4007            if(i == final_index)
4008            {
4009                stat_io_vector[vector_index].iov_len = final_size;
4010                break;
4011            }
4012            else
4013            {
4014                stat_io_vector[vector_index].iov_len = size_list[i];
4015            }
4016        }
4017    }
4018
4019    assert(count > 0);
4020
4021    if(send_recv == BMI_RECV)
4022    {
4023        ret = BMI_sockio_nbvector(s, stat_io_vector, count, 1);
4024    }
4025    else
4026    {
4027        ret = BMI_sockio_nbvector(s, stat_io_vector, count, 0);
4028    }
4029
4030    /* if error or nothing done, return now */
4031    if(ret == 0)
4032        return(0);
4033    if(ret <= 0)
4034        return(bmi_tcp_errno_to_pvfs(-errno));
4035
4036    completed = ret;
4037    if(header_flag && (completed >= 0))
4038    {
4039        /* take care of completed header status */
4040        tmp_env_done = TCP_ENC_HDR_SIZE - *env_amt_complete;
4041        if(tmp_env_done > completed)
4042            tmp_env_done = completed;
4043        completed -= tmp_env_done;
4044        ret -= tmp_env_done;
4045        (*env_amt_complete) += tmp_env_done;
4046    }
4047
4048    i=header_flag;
4049    while(completed > 0)
4050    {
4051        /* take care of completed data payload */
4052        if(completed >= stat_io_vector[i].iov_len)
4053        {
4054            completed -= stat_io_vector[i].iov_len;
4055            *current_index_complete = 0;
4056            (*list_index)++;
4057            i++;
4058        }
4059        else
4060        {
4061            *current_index_complete += completed;
4062            completed = 0;
4063        }
4064    }
4065
4066    return(ret);
4067}
4068
4069static void bmi_set_sock_buffers(int socket){
4070        //Set socket buffer sizes:
4071        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Default socket buffers send:%d receive:%d\n",
4072                GET_SENDBUFSIZE(socket), GET_RECVBUFSIZE(socket));
4073        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Setting socket buffer size for send:%d receive:%d \n",
4074                tcp_buffer_size_send, tcp_buffer_size_receive);
4075    if( tcp_buffer_size_receive != 0)
4076         SET_RECVBUFSIZE(socket,tcp_buffer_size_receive);
4077    if( tcp_buffer_size_send != 0)
4078         SET_SENDBUFSIZE(socket,tcp_buffer_size_send);
4079        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Reread socket buffers send:%d receive:%d\n",
4080                GET_SENDBUFSIZE(socket), GET_RECVBUFSIZE(socket));
4081}
4082
4083/*
4084 * Local variables:
4085 *  c-indent-level: 4
4086 *  c-basic-offset: 4
4087 * End:
4088 *
4089 * vim: ts=8 sts=4 sw=4 expandtab
4090 */
Note: See TracBrowser for help on using the browser.