/* * These are the library routines for read,write,wait,mpi thing. * Author: Sarah Anderson sea@lcse.umn.edu * * University of Minnesota / LCSE * January 2002 * * Much Adapted from earlier code by * Thom. M. Ruwart * */ #ifndef WIN32 /* UNIXisms */ #include #include #include #include #include #include #include #include #include #include #include #include #define HANDLE int #define DWORD int #define WINAPI #define INVALID_HANDLE_VALUE -1 #define SLASH "/" #define CSLASH '/' #define FOFFSET unsigned #define _ASSERTE assert #define CLOSEFILE close #include #define Sleep(n) usleep(n*10) #else /* NTisms */ #include "mpi.h" #include #include #include /* for _ASSERTE */ #include #include #include #include #include #include #include #include #include #include #include "mmsystem.h" #define SLASH "\\" #define CSLASH '\\' #define FOFFSET __int64 #define CLOSEFILE CloseHandle #endif #include "mpi.h" #include "gm.h" /* Local includes */ #include "thing.h" #include "iothing.h" #include "threadhelper.h" /* Rio usage */ int rio_write ( char *pathname, void *addr, unsigned nbytes, FOFFSET offset ); int rio_read ( char *pathname, void *addr, unsigned nbytes, FOFFSET offset ); int rio_writeh ( char *host, char *pathname, void *addr, unsigned nbytes, FOFFSET offset ); int rio_readh ( char *host, char *pathname, void *addr, unsigned nbytes, FOFFSET offset ); int rio_ping( char *hostname ); /* * The following is a set of queues that contain all the read and write * requests in their several "states". There are four basic types of lists * of iop_things: * 1 - Free list * 2 - Inbaskets for read and write requests waiting to be processed * 3 - InProcess lists for read and write operations In Process * * For example, when a program makes a READ request,it will call * Read_Thing () which will take the request and do the following: * - format the IOP structure with all the necessary data * - put the iop structure on the readInbasket list (enqueue_iop_thing) * - Release the Read_Thingy_ready_hsema semaphore to signal * Read_Thingy() that there is something to do. * - The Read_Thingy () will take the iop_thing off the * readInbasket and put it on the readInprocess list. (dequeue_iop_thing) * - Put the iop_thing being processed on the ReadInprocess queue (enqueue_iop_thing) * - Once the read operation has completed, the associated iop_thing * The same is true for the write process. */ /* G L O B A L S -------------------------------------------------*/ /* debugging only (in rio) */ extern FILE *Log; extern char *timestamp(); /* The queue of read requests */ static iop_queue_t readWriteBasket = {NULL,NULL,READWRITEBASKETMUTEX,"ReadWritebasket"}; /* The queue of read and write ops that are complete */ static work_queue_t Work_queue = {NULL,NULL,WORKQUEUEMUTEX,"WorkQueue" }; static work_queue_t WorkDone_queue = {NULL,NULL,WORKDONEQUEUEMUTEX,"WorkDoneQueue" }; static int Initialized = 0; /* 0=not initialized, 1=initialized */ static int SharedFileSystem= 0; /* created objects are sharable or not */ static long readwrite_thingy_handle; /* handle of the read_thingy */ static volatile DWORD readwrite_thingy_state = THING_STOPPED; static volatile DWORD mpi_thingy_state = THING_STOPPED; static long mpi_thingy_handle; /* handle of the mpi_thingy */ /* This is the root of the objects created by "create_thing" */ static char RootDirName[THING_NAME_LEN]; /* These are setup at start_thing */ static int Myid = -1; /* My node number. The TM is node '0', and CM's 1.. */ static int Nodes = 0; /* Number of participating MPI nodes */ /* Map from MPI node number to hostname - needed for rio call. This is also * built in start_thing. */ static char Hostnames[128][16]; /*---------------------------------------------------------------- * List of thing stuff. All these are protected by a write lock */ static int Nthings= 0; thing_t ListOfThings[MAX_THINGS]; #undef USEHASH #ifdef USEHASH static struct gm_hash* HashOfThings; #endif /* When user code (in start_thing or create_thing) makes a new object, the * birth announcement makes a complete circuit of all other nodes before the * user code returns. For now.... for large clusters this may become a * problem. */ static volatile int CreationNode= 0; /* This is outgoing message traffic for mpi_thingy, set up in add_thing */ static int SendNewThing; /* set in add_thing to cause mpi_thingy to * inject a new forwarded birth announcement */ thing_t NewThing; /* Pointer to the newthing to announce */ /*---------------------------------------------------------------- * Remote I/O (RIO) operation * * // the question to explore is multi-buffer RIO operations. WHat happens * if and when multiple TAG_RIOHEADER requests come into the remote node? * * * Raise SendRIO flag, (nonzero) signalling the presence of a filled-in * RIOthing structure. This structure is sent with a TAG_RIOHEADER message to * the remote node and SendRIO cleared to zero. This is the call initiated by * the user's code calling read/write_thing. * * On receipt of the TAG_RIOHEADER message, the remote node enqueues the IO * request. * * When the IO request is processed by readwrite_thingy, dowrite_thingy or * doread_thingy is called - as appropriate. So, from the disk-node thingy's * perspective: * * RIOWRITE: Since we need bytes from MPI: * 1) raise flag, having specified the remote address, * byte count and node. Sets , post an Irecv and then send the * struct with a TAG_RIOWRITE message to the originating node. * 2) The response to TAG_RIOWRITE should be a TAG_RIODATA packet, which * should get eaten by the Irecv. Immediately after that send, send * TAG_RIOEOD (EndOfData), which resets RIOWrite.pending and notifies the * disk process the bytes are there by releasing SEM_RIOMPI * * RIOREAD: We need to send bytes over MPI: * 1) Raise flag, which sends TAG_RIOREAD structure. Note * the interpretation of 'sourceAddr' and 'destAddr' is consistent, that is * the originating node (not the reader) now uses the destAddr. * Also set on the reading node. * 2) After the originating node posts its Irecv(), it sends TAG_RIOCTS (clearToSend), * which signals the reading node to immediately send its * TAG_RIODATA message. and clear RIOinfo.pending. * Once it is send, SEM_RIOMPI is released so the disk process can proceed and * reuse the source buffer for the next segment (if any). The whole process is repeated * (TAG_RIOREAD and all) for each buffer segment. */ typedef struct _RIO { void *sourceAddr; /* address from which bytes are read */ void *destAddr; /* address to which bytes are written */ unsigned nbytes; int node; /* node which is targeted by the disk (r/w) node */ int start; /* flag to start process in mpi_thingy in disk node */ int pending; /* consistency check - make sure tags are expected */ } RIO_t; /* This structure rolls up all the required RIO information. For a node * to satisfy a transfer over MPI all that is needed is the local memory address * (whether it be source or destination) and a byte count. */ static volatile RIO_t RIOinfo = {NULL,NULL,0,0,0,0}; /* First, the way for read_thing/write_thing to place a remote I/O * operation at the target node we have the originating mpi_thingy * send the message: */ static volatile int SendRIO=0; /* set in r/w_thing - 1 read, 2 write */ static thing_t RIOthing; /* These communicate between mpi_thingy and readwrite_thingy. */ static char* RIObuffer[2]={NULL,NULL}; /* double buffered RIO data 0:MPI, 1:IO */ static volatile int SendRIOstatus=0; static thing_t RIOstatus; /* status of RIO operation to be returned */ #define MAXRIOSIZE (4*1024*1024) /* number of bytes in a RIO piece */ /* These are the MPI message tags, see mpi_message_handler */ #define TAG_ASSIGNWORK 1 #define TAG_WORKDONE 2 #define TAG_WORKREQUEST 3 #define TAG_THINGCREATE 4 #define TAG_THINGREMOVE 5 #define TAG_RIOREAD 6 #define TAG_RIODATA 7 #define TAG_RIOCTS 8 #define TAG_RIOWRITE 9 #define TAG_RIOEOD 10 #define TAG_RIOSTATUS 11 #define TAG_RIOHEADER 12 /*================================================================ */ /* debugging pointer prints */ #define LOWEST(p) ((unsigned)((unsigned long)p & 0xFFFFFFFF)) #undef DBGRIO /* --------------------------------------------------------------- * Work assignment globals */ /* This is an outgoing message staged for SendingMessage */ static struct { int tag; /* 1 assignwork, ... */ int to; void *buffer; unsigned int nbytes; } Message; /* This message is staged for mpi_thingy to send... */ static volatile int SendingMessage= 0; /* when SendingMessage is nonzero. mpi_thingy is reset * when it is sent. */ static volatile int GotWorkID=1; /* reset when WorkID is expected in mpi_thingy */ static char WorkID[THING_MAX_WORKID]; /* mpi_thingy places incoming workids here */ /* ---------------------------------------------------------------- * Prototypes */ /* Debugging counter of status messages received for each node from * each node */ int Stat[64][64]= {0,} ; #ifdef WIN32 /*------------------------------------------------------------ * Handy version of 'perror' that gets the windows API error codes */ void error_thing( char *str, thing_t *status ) { char *string; FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, status->status, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR) &string, 0, NULL ); fprintf(stderr,"%s: %s\n", str, string); LocalFree( string ); return; } #else void error_thing( char *str, thing_t *status ) { fprintf(stderr,"%s: %s\n", str, strerror(status->status) ); } #endif /*------------------------------------------------------------*/ /* * This routine will initialize the read_thingy () and write_thingy () * daemons as well as all associated semaphores, queues, ..etc. * * This Routine must be called before any other routines are * invoked or something terrible will happen. * * The flag causes all objects to have * home nodes (if 0) or else all objects have home_node -1, * that is none. * * Return values: * -1 error, else * 0.. the communications node number of the participant. */ int start_thing(char *rootDir, int sharedFileSystem, int *argc, char ***argv) { int i; /* Your standard run-of-the-mill variable */ int ierr; int nchostname; char hostname[MPI_MAX_PROCESSOR_NAME]; /* Directory reading stuff */ #ifdef _WIN32 char findpat[256]; WIN32_FIND_DATA finddata; HANDLE dirhandle; #else #include #include DIR *dirhandle,*testhandle; struct dirent *finddata; char name[256]; #endif /* debugging only: */ int rio_start(int port, FILE *log); extern int Rio_Loglevel; #define GMCLIENTPORT 4 if (Initialized) { fprintf(stderr,"Start_Thing: Already initialized\n"); return -1; } /* passing in 0/NULL allows the caller to init. MPI itself */ if ( *argc && *argv != NULL ) { MPI_Init( argc, argv ); } MPI_Comm_size( MPI_COMM_WORLD, &Nodes ); MPI_Comm_rank( MPI_COMM_WORLD, &Myid ); MPI_Get_processor_name( hostname, &nchostname ); if ( nchostname > 16 ) { fprintf(stderr,"start_thing: hostname length >16 (%d): %s\n",nchostname,hostname); return -1; } MPI_Allgather( hostname,16,MPI_CHAR, Hostnames,16,MPI_CHAR, MPI_COMM_WORLD ); Rio_Loglevel= 2; if ( Myid > 0 ) { /* start rio clients for the CMs */ if ( !rio_start( GMCLIENTPORT,fopen("/scr/sanderso/t99/clientlog","w+") ) ) { fprintf(stderr,"Node %d cannot start rio on port %d\n", Myid, GMCLIENTPORT ); return -1; } } #if 0 if ( Myid == 1 ) { /* Let the 1st cm node (1), ensure all srio servers are up & running */ ierr= 0; for(i=1; id_type != DT_DIR ) { /* fprintf(stderr,"cm%d: file? '%s'\n", Myid, finddata->d_name ); */ add_thing (finddata->d_name, DISK_THING, Myid, 1 ); } #if 0 /* This code is needed for Irix, brain-damaged linuxii? */ /* If open as directory fails, this is a file. (sigh) */ strcpy(name,RootDirName); strcat(name,"/"); strcat(name,finddata->d_name); testhandle= opendir( name ); if ( testhandle == NULL ) add_thing (finddata->d_name, DISK_THING, Myid, 1 ); else closedir(testhandle); #endif } closedir( dirhandle ); #endif skipit: /* send end-of-list item */ NewThing.attributes= -1; SendNewThing= 1; CreationNode= Myid+1; while ( SendNewThing ) Sleep(2); /* Don't return until all forwarding for locally found names is done. */ fprintf(stderr," CreationNode=%d waiting for %d\n", CreationNode, Nodes ); /* Wait for All nodes to have checked in */ while ( CreationNode < Nodes ) Sleep(20); /* Cannot use mpi barrier since threads are doing recv/sends here */ /* MPI_Barrier( MPI_COMM_WORLD ); */ return(Myid); } /* end of start_thing() */ /*---------------------------------------------------------------- */ int getId_thing() { return Myid; } int getNodeCount_thing() { return Nodes; } /*---------------------------------------------------------------- * Set termination flags and notify all the helper threads to .. * go away. */ int stop_thing() { /* for now, for debugging, do not stop until all do */ MPI_Barrier(MPI_COMM_WORLD); readwrite_thingy_state = THING_STOPPING; t_release(SEM_READWRITE_THINGY); mpi_thingy_state= THING_STOPPING; while ( readwrite_thingy_state == THING_STOPPING ) Sleep(1); while ( mpi_thingy_state == THING_STOPPING ) Sleep(1); MPI_Finalize(); Initialized= 0; return 0; } /*----------------------------------------------------------- * Return when and only when this node has notified * all other nodes of its created things. This can be * called by any node - but only guarantees global name space * coherence if it is called by all creators. */ void synchAllCreated_thing() { } /*------------------------------------------------------------ * Returns milliseconds since Genesis */ unsigned long getMillisec() { #ifdef _WIN32 struct _timeb times; unsigned long timeNow; _ftime(×); timeNow = times.time*1000 + times.millitm; #else struct timeval times; unsigned long timeNow; gettimeofday( ×, NULL ); timeNow = times.tv_sec*1000 + times.tv_usec/1000; #endif return timeNow; } /*---------------------------------------------------------------- * MPI message handler. What to do with an incoming message will * get complex, but will depend on its source & tag which is in * . */ void mpi_message_handler( MPI_Status *mpistatus ) { /* Eat the messsage. Possible messages are TAG_* */ int source= mpistatus->MPI_SOURCE; int tag= mpistatus->MPI_TAG; int len= mpistatus->count; MPI_Status status; static MPI_Request request; int i, err, myerror; work_item_t *wp; RIO_t rio; thing_t thing, *iop, riostatus; thing_t *tp; /*fprintf(stderr,"***in handler source=%d, tag=%d, len=%d\n", source,tag,len ); */ if ( (tag==TAG_ASSIGNWORK || tag== TAG_WORKDONE || tag==TAG_WORKREQUEST) && len > THING_MAX_WORKID ) { fprintf(stderr,"Error: Bad workid size truncated to maximum of %d bytes\n", THING_MAX_WORKID ); len= THING_MAX_WORKID; } switch( tag ) { case TAG_ASSIGNWORK: /* assignwork() is talking to me.. */ /* wait first for getwork to pick up any previous copy of WorkId.. */ while (!GotWorkID) Sleep(1); GotWorkID= 0; MPI_Recv( WorkID, THING_MAX_WORKID, MPI_BYTE, source,tag, MPI_COMM_WORLD, &status ); /*fprintf(stderr,"received assignwork msg length %d from %d\n", len,source ); */ /* release the getwork which must be hanging here for work */ t_release( SEM_GETWORK ); return; case TAG_WORKDONE: /* workDone() is talking to me.. */ MPI_Recv( WorkID, THING_MAX_WORKID, MPI_BYTE, source,tag,MPI_COMM_WORLD, &status ); /*fprintf(stderr,"received workdone msg from %d\n", source ); */ /* place in queue for subsequent waitForWorker_thing calls */ wp= (work_item_t*)malloc( sizeof(work_item_t) ); wp->sender= source; wp->sizeofworkID= len; memcpy( wp->workID, WorkID, len ); wp->completed= 1; enqueue_work_thing( wp, &WorkDone_queue ); return; case TAG_WORKREQUEST: /* getWork() request */ MPI_Recv( WorkID, THING_MAX_WORKID, MPI_BYTE, source,tag,MPI_COMM_WORLD, &status ); /*fprintf(stderr,"received getwork req from %d\n", source );*/ /* place in queue for subsequent waitForWorker_thing calls */ wp= (work_item_t*)malloc( sizeof(work_item_t) ); wp->sender= source; wp->sizeofworkID= len; memcpy( wp->workID, WorkID, len ); wp->completed= 0; enqueue_work_thing( wp, &Work_queue ); return; case TAG_THINGCREATE: /* thing creation */ MPI_Recv( &thing, sizeof(thing_t), MPI_BYTE, source,tag,MPI_COMM_WORLD, &status ); /* The message will be a thing_t structure which we can use * to call create_thing locally. */ /* i= status.MPI_SOURCE; fprintf(stderr,"%d received thing %s from %d creation home_node=%d\n", Myid, thing.name,i, thing.home_node ); */ if ( thing.attributes == -1 ) { CreationNode++; /* = status.MPI_SOURCE+1; */ /* fprintf(stderr,"%d received eom thingcreate from %d, next sender node %d\n", Myid, status.MPI_SOURCE, CreationNode ); */ #if 0 if ( thing.home_node == Myid ) { /* welcome back */ t_release(SEM_ANNOUNCE); #endif } else { /* add to local table */ tp= add_thing (thing.name, thing.attributes, thing.home_node, 0 ); if ( tp== NULL ) fprintf(stderr, "Warning: duplicate local/nonlocal things named '%s' between nodes %d and %d\n", thing.name, thing.home_node, Myid ); #if 0 else { i= (Myid+1)%Nodes; err= MPI_Send(&thing,sizeof(thing_t),MPI_BYTE, i,TAG_THINGCREATE, MPI_COMM_WORLD ); if ( err != MPI_SUCCESS ) fprintf(stderr,"cm %d: MPI error (%d) in forwarding %s\n",Myid,err, thing.name); } #endif } return; case TAG_THINGREMOVE: /* thing removal */ MPI_Recv( &thing, sizeof(thing_t), MPI_BYTE, source,tag,MPI_COMM_WORLD, &status ); #if 0 if ( thing.home_node == Myid ) { /* welcome back - free user code */ t_release(SEM_ANNOUNCE); } else { #endif remove_thing( thing.name, 0 ); /* remove from local table */ /* MPI_Send( &thing, sizeof(thing_t), MPI_BYTE, (Myid+1)%Nodes, TAG_THINGREMOVE,MPI_COMM_WORLD ); */ return; case TAG_RIOHEADER: /* remote Read or Write */ iop= (thing_t*)malloc( sizeof(thing_t) ); MPI_Recv( iop, sizeof(thing_t), MPI_BYTE, source,TAG_RIOHEADER, MPI_COMM_WORLD, &status ); /* fprintf(stderr,"!!!!!!!!!!!!!!!!remote i/o '%s' requested of node %d type %d\n", iop->name, Myid, iop->RIO); */ _ASSERTE( iop->RIO ); iop->allocated= 2; #ifdef DBGRIO fprintf(stderr,"cm %d enqueuing RIO w/status %x\n", Myid, LOWEST(iop->origStatus) ); #endif /* So I have all I need to enqueue this. */ enqueue_iop_thing( iop, &readWriteBasket ); /* Signal readwrite_thingy () that something needs to be done */ t_release( SEM_READWRITE_THINGY ); return; case TAG_RIOWRITE: /* supply bytes to remote writer process */ MPI_Recv( &rio, sizeof(rio), MPI_BYTE, source,tag, MPI_COMM_WORLD, &status ); myerror=MPI_Send( rio.sourceAddr, rio.nbytes, MPI_BYTE, source,TAG_RIODATA, MPI_COMM_WORLD ); if ( myerror != MPI_SUCCESS ) { fprintf(stderr,"MPI RIO write send error from %d to %d\n", Myid,source); } MPI_Send( NULL,0,MPI_BYTE, source, TAG_RIOEOD, MPI_COMM_WORLD ); /* fprintf(stderr,"node %d send bytes & RIOEOD to %d\n", Myid, source ); */ return; case TAG_RIOEOD: /* riowrite: EndOfData: orig node has sent its data to our irecv */ MPI_Recv( NULL,0,MPI_BYTE, source, TAG_RIOEOD, MPI_COMM_WORLD, &status ); _ASSERTE( RIOinfo.pending != 0); RIOinfo.pending= 0; t_release( SEM_RIOMPI ); return; case TAG_RIOREAD: /* store bytes from remote reader process */ MPI_Recv( &rio, sizeof(rio), MPI_BYTE, source,tag, MPI_COMM_WORLD, &status ); MPI_Irecv( rio.destAddr, rio.nbytes, MPI_BYTE, source, TAG_RIODATA, MPI_COMM_WORLD, &request ); MPI_Request_free( &request ); MPI_Send( NULL,0, MPI_BYTE, source, TAG_RIOCTS, MPI_COMM_WORLD ); /* MPI_Recv( rio.destAddr, rio.nbytes, MPI_BYTE, source, TAG_RIODATA, MPI_COMM_WORLD, &status );*/ return; case TAG_RIOCTS: /* rioread: clearToSend: orig node has posted its irecv */ MPI_Recv( NULL,0, MPI_BYTE, source,tag, MPI_COMM_WORLD, &status ); _ASSERTE( RIOinfo.pending ); myerror=MPI_Send( RIOinfo.sourceAddr, RIOinfo.nbytes, MPI_BYTE, source, TAG_RIODATA, MPI_COMM_WORLD ); if ( myerror != MPI_SUCCESS ) { fprintf(stderr,"MPI CTS send error from %d to %d\n", Myid,source); } RIOinfo.pending= 0; t_release( SEM_RIOMPI ); /* notifying disk process */ return; case TAG_RIOSTATUS: /* result of the RIO operation */ MPI_Recv( &riostatus, sizeof(RIOstatus), MPI_BYTE, source,tag, MPI_COMM_WORLD, &status); /* we want to use the originally supplied pointer- all info supplied by the * RIO status, except the 'allocated' info which is not disturbed */ iop= riostatus.origStatus; i= iop->allocated; *iop= riostatus; iop->allocated= i; #ifdef DBGRIO Stat[Myid][source]++; err=0; for(i=0; inbytes_xferred ); #endif returnStatus( iop ); return; #if 0 case 8: /* RPC header */ MPI_Recv( &rpcHeader, 2*sizeof(int), MPI_INT, source,tag, MPI_COMM_WORLD, &status ); if ( rpcHeader.nbytes ) { data= (char*)malloc( rpcHeader.nbytes ); MPI_Recv( data, rpcHeader.nbytes, MPI_BYTE, source,9, MPI_COMM_WORLD, &status ); } else data= NULL; /* invoke */ fprintf(stderr,"Node %d Invoking rpc handler %d\n", Myid, rpcHeader.handler ); if ( data != NULL ) free(data); return; #endif default: fprintf(stderr,"Unexpected message tagged %d ignored\n", tag ); } return; } /*-------------------------------------------------------------------------------- * Message handler. */ DWORD WINAPI mpi_thingy(void) { int iflag, idle, node; MPI_Status mpistatus; static MPI_Request request; mpi_thingy_state= THING_RUNNING; while ( mpi_thingy_state == THING_RUNNING ) { idle= 1; /* first, drain the network */ do { MPI_Iprobe( MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &iflag, &mpistatus ); if ( iflag ) { /* deal wid it. */ mpi_message_handler( &mpistatus ); idle= 0; } } while (iflag); if ( idle ) Sleep(100); /* Check for outgoing traffic for work primitives */ if ( SendingMessage ) { /* fprintf(stderr,"sending message tagged %d to %d\n", Message.tag, Message.to );*/ MPI_Send( Message.buffer, Message.nbytes, MPI_BYTE, Message.to, Message.tag, MPI_COMM_WORLD ); SendingMessage= 0; idle= 0; } /* Outgoing traffic to make birth(create) announcements. */ if ( SendNewThing ) { for (node=0; nodeworkID, item->sizeofworkID ); node= item->sender; free(item); return node; } if ( timeOut == 0 ) return -1; Sleep(1); if ( timeOut < 0 ) if ( (getMillisec() - timeNow) > (unsigned long)timeOut ) return -1; } while(1); } /*------------------------------------------------------------ * * This routine is called by a user program to check for * work completion messages. * * It returns the id of a worker (0..) which is is * Signalling a workID (returned) is done. * * If timeOut expires (is 0 or milliseconds) with no action, returns -1 * */ int waitForWorkDone_thing( workID_thing_t task, int sizeOfWorkID, int timeOut ) { work_item_t *item; unsigned long timeNow; int node; timeNow= getMillisec(); do { /* get the top element off the work queue */ item= dequeue_work_thing( &WorkDone_queue ); if ( item != NULL ) { /* hallelujah*/ memcpy( task, item->workID, item->sizeofworkID ); node= item->sender; free(item); return node; } if ( timeOut == 0 ) return -1; Sleep(1); if ( timeOut < 0 ) if ( (getMillisec() - timeNow) > (unsigned long)timeOut ) return -1; } while(1); } /*---------------------------------------------------------------- * This routine is called by a user program to reply to a * request for work. */ int assignWork_thing( int workerNumber, workID_thing_t task, int sizeOfWorkID ) { if ( sizeOfWorkID > THING_MAX_WORKID ) return -1; /* we use the generic lock for outgoing work message traffic... */ t_lock(0); Message.tag= 1; Message.to= workerNumber; Message.nbytes= sizeOfWorkID; Message.buffer= task; SendingMessage= 1; /* wait for mpi_thingy to notice and send it */ while ( SendingMessage ) Sleep(5); t_unlock(0); return 0; } /*------------------------------------------------------------ * This routine is called by a user program to request work. * It blocks until such has been assigned. * * If is <0, do *not* send a message requesting work- * just wait for an assignment. */ int getWork_thing( int node, workID_thing_t task, int sizeOfWorkID ) { if ( node >= 0 ) { /* we use the generic lock for outgoing work message traffic... */ t_lock(0); Message.tag= 3; Message.to= node; Message.nbytes= sizeOfWorkID; Message.buffer= task; SendingMessage= 1; /* wait for mpi_thingy to notice and send it */ while ( SendingMessage ) Sleep(5); t_unlock(0); } t_wait(SEM_GETWORK); /* wait for mpi_thingy to release me on receipt of * assigned work */ memcpy( task, WorkID, sizeOfWorkID ); GotWorkID= 1; return 0; } /*---------------------------------------------------------------- * This routine is called by a user program, to inform someone * that some work assigned has been completed. *----------------------------------------------------------------*/ int workDone_thing( int assigner, workID_thing_t task, int sizeOfWorkID ) { /* if ( sizeOfWorkID > THING_MAX_WORKID ) return -1; */ /* we use the generic lock for work message traffic... */ t_lock(0); Message.tag= 2; Message.to= assigner; Message.nbytes= sizeOfWorkID; Message.buffer= task; SendingMessage= 1; /* wait for mpi_thingy to notice and send it */ while ( SendingMessage ) Sleep(5); t_unlock(0); return 0; } /*---------------------------------------------------------------- * Look up the thing on the global list. Returns NULL if not found, */ thing_t * find_thing( char *name ) { thing_t *tp; int i; #ifndef USEHASH for(i=0; iname) ) return tp; } return NULL; #else tp= (thing_t*)gm_hash_find( HashOfThings, name ); if ( tp==NULL ) { /* gmhash does not appear to work. punt. fprintf(stderr," gm_hash could not find '%s', searching..",name); */ /* try linear search, damnnit */ for(i=0; ihome_node, tp->name ); } } /*---------------------------------------------------------------- * Returns 0 if is not 1..Nthings * Otherwise, returns the length of the thingname, also setting the */ int getnameof_thing( int thingnumber, char *name, int *home ) { thing_t *tp; if ( thingnumber < 1 || thingnumber>Nthings ) return 0; /* tp= find_thing( NamesOfThings[thingnumber] ); if ( tp==NULL ) return 0; */ tp= &ListOfThings[thingnumber-1]; strcpy(name, tp->name ); *home= tp->home_node; return strlen(name); } /*---------------------------------------------------------------- * Add the thing named to the list of things, and if , * announce it to other nodes by starting (and completing) a forwarding chain. * * Returns NULL on error, else pointer to thing hashed thing table. */ thing_t* add_thing( char *name, int attrib, int homeNode, int announce ) { int it; thing_t anew, *tp, *tp2; if ( Nthings == MAX_THINGS ) { fprintf(stderr,"Error: Too many things (%d)\n", Nthings ); return NULL; } tp= find_thing(name); if ( tp != NULL ) { fprintf(stderr,"Error: cm %d: Thing \n",Myid); fprintf(stderr,"'%s' already exists, named '%s' home %d\n", name, tp->name, tp->home_node ); return NULL; } t_lock(THINGLISTMUTEX); /* is this necessary? Lets hope not. just defer incr until copy in is done. it= t_ifetchadd( Nthings, 1 ); */ it= Nthings; /* strncpy( NamesOfThings[it],name, THING_NAME_LEN ); */ anew.attributes= attrib; strncpy( anew.name, name, THING_NAME_LEN ); /* If this is a shared disk thing - set the home node as -1, meaning anyone * can directly read/write this object */ if ( SharedFileSystem ) anew.home_node= -1; else anew.home_node= homeNode; anew.localaddress= NULL; ListOfThings[it]= anew; tp= &ListOfThings[it]; ++Nthings; t_unlock(THINGLISTMUTEX); /* fprintf(stderr,"inserting into %p %s %p...,", ListOfThings, name, &anew );*/ #ifdef USEHASH if ( gm_hash_insert( HashOfThings, (void*)name, (void*)tp ) ) perror( "could not add to list of things"); #endif /* send off announcement to all other nodes.. * unless its home is '-1', that is, shared. */ if ( announce && anew.home_node>=0 ) { NewThing= anew; /* fprintf(stderr,"cm%d announcing '%s'\n", Myid, NewThing->name ); */ SendNewThing= 1; /* wait for mpi_thingy to pick it up and send it */ while ( SendNewThing ) Sleep(1); } /* fprintf(stderr," checking, looking up %s\n", name ); */ #ifdef USEHASH tp2= (thing_t*)gm_hash_find( HashOfThings, name ); if ( tp2 == NULL || tp!=tp2 ) fprintf(stderr," error: added, looking up %s found %p not %p\n", name, tp2, tp); #endif return tp; } /*---------------------------------------------------------------- * remove..not implemented * Returns -1 on error (impossible at the moment). * This should pass this fact on to other nodes. */ int remove_thing( char *name, int forwardit ) { #if 0 thing_t *tp; #ifndef USEHASH int ip, i; tp= find_thing(name); if ( tp== NULL) return 0; /* t_lock(THINGLISTMUTEX); */ for(i=ip; i, the object is created if nonexistent. * * Possible attributes should be 'or'd together: READ_ONLY_THING, * DISK_THING, etc.. * * Return values: -1 or -2 is bad, anything else is successful. * -2 Already exists * -1 Does not exist * ----------------------------------------------------------------*/ HANDLE open_thing(char *newname, thing_attributes_t attributes, int create ) { char name[THING_NAME_LEN]; int flags,flags2; thing_t *tp; /* thing pointer */ int i; if ( create ) { tp= find_thing(newname); if ( tp != NULL ) { if ( attributes & REPLACE_THING ) return 0; /* ok */ else return (HANDLE)-2; /* already exists */ } tp= add_thing (newname, attributes, Myid, 1 ); if ( tp == NULL ) return (HANDLE)-1; /* too many, or some other error */ } else { tp= find_thing(newname); if ( tp == NULL ) return (HANDLE)-1; /* does not exist! */ attributes= tp->attributes; } /* compose name as root directory name + this name */ strcpy(name,RootDirName); strcat(name,newname); /* fprintf(stderr, "Create_thing: '%s'\n", name ); fprintf(stderr, "RootDirName: '%s', newname='%s'\n", RootDirName,newname ); */ #ifdef WIN32 if ( attributes & READ_ONLY_THING ) flags= GENERIC_READ; else flags= GENERIC_READ | GENERIC_WRITE; /* flags2= FILE_FLAG_OVERLAPPED | FILE_FLAG_SEQUENTIAL_SCAN; */ flags2= FILE_FLAG_SEQUENTIAL_SCAN | FILE_FLAG_OVERLAPPED; /* if ( attributes & DIRECTIO_THING ) flags2|= FILE_FLAG_NO_BUFFERING; */ return CreateFile( name, flags, FILE_SHARE_READ, NULL, OPEN_ALWAYS, flags2, NULL ); #else if ( attributes & READ_ONLY_THING ) flags= O_RDONLY; else flags= O_RDWR|O_CREAT; return open( name, flags, 0644 ); #endif } /*------------------------------------------------------------------- * Return value: 0 OK, -1 error * Possible errors include * permissions violation (file system) * -2 duplicate name found (already created) * -1 does not exist */ int create_thing(char *newname, thing_attributes_t attributes, unsigned maxsize ) { HANDLE fd; int ifd; int zero=0; fd= open_thing(newname, attributes, 1 ); ifd= (int)fd; if ( ifd == -1 || ifd== -2) return ifd; #ifdef WIN32 if ( maxsize > 0 ) { SetFilePointer( fd, maxsize, NULL, FILE_BEGIN ); SetEndOfFile(fd); } CloseHandle(fd); #else if ( maxsize > sizeof(int) ) { lseek( fd, maxsize-sizeof(int), SEEK_SET ); write( fd, &zero, sizeof(int) ); } close(fd); #endif return 0; } /* end of create_thing() */ /*------------------------------------------------------------ * returns 0 if OK, else error code from deleting the disk object. */ int destroy_thing( char *newname ) { char name[THING_NAME_LEN]; int ierr; remove_thing(newname, 1); strcpy( name, RootDirName ); strcat( name, newname ); #ifdef WIN32 ierr= DeleteFile( name ); if ( !ierr ) return GetLastError(); else return 0; #else ierr= unlink(name); #endif return 0; } /*------------------------------------------------------------ * Read_thing creates an iop structure and enqueues it locally * (or with mpi_thingy's help, remotely) on the ReadWriteBasket. * * Return values: 0 is queued OK, -1 error. * * This routine is used by both the callback and status * returning versions of read_thing. */ int read_thyng( thing_t *thing, unsigned long offset, void *localAddress, unsigned nbytes, thing_t *status, void (*callback)(void *,void*), void *callback_data ) { thing_t *iopp; if ( status == NULL ) { iopp= (thing_t*)malloc( sizeof(thing_t) ); *iopp= *thing; iopp->allocated= 2; /* This is a callback notified iop. completion should free this. */ iopp->callback= callback; iopp->callback_data= callback_data; } else { iopp= status; *iopp= *thing; /* This is a simple iop transaction */ iopp->callback= NULL; iopp= status; iopp->allocated= 1; } /* fill in the iop structure */ iopp->offset = offset; iopp->nbytes = nbytes; iopp->localaddress = localAddress; iopp->status = 0; iopp->RIO= 0; iopp->rorw= 0; iopp->complete= 0; #if 0 /* If the thing is remote, send a message to the remote node * initiating the transfer. Note a home_node of -1 means the object * is on shared disk and hence 'local'. */ if ( thing->home_node == -1 || thing->home_node == Myid ) { /* Enqueue the iop_thing on the readInbasket queue */ enqueue_iop_thing(iopp,&readWriteBasket); /* Signal readwrite_thingy () that something needs to be done */ t_release( SEM_READWRITE_THINGY ); } else { /* We have a nonlocal read. Signal mpi_thingy to start the remote * read on another node. */ iopp->RIO = 1; iopp->origStatus= iopp; iopp->origNode= Myid; RIOthing= *iopp; SendRIO= 1; while (SendRIO) Sleep(1); /* wait for mpi_thingy to pick it up and send */ /* fprintf(stderr,"non local read of %s.. punted to %d\n",thing->name, thing->home_node ); */ /* fprintf(stderr,"non local read of %s.. punted to %d\n",thing->name, thing->home_node ); */ } #else /* Now all things are 'local' but still need the RIO flags set */ if ( thing->home_node != -1 && thing->home_node != Myid ) iopp->RIO= 1; /* Enqueue the iop_thing on the readInbasket queue */ enqueue_iop_thing(iopp,&readWriteBasket); /* Signal readwrite_thingy () that something needs to be done */ t_release( SEM_READWRITE_THINGY ); #endif return 0; } /* end of read_thyng */ /*------------------------------------------------------------ */ int read_thing(char *source, unsigned long offset, void *localAddress, unsigned nbytes, thing_t *status) { thing_t astatus; int ierr, ip; thing_t *tp; /* Lets go find it. */ tp= find_thing(source); if ( tp == NULL ) { fprintf(stderr,"Error: Can't find '%s' object to read\n", source ); return -1; } /* if we must supply the status, this is a blocking read */ if ( status == NULL ) { ierr= read_thyng(tp,offset,localAddress,nbytes,&astatus,NULL,NULL ); if ( ierr == -1 ) return -1; return wait_thing ( &astatus, TILDONE_THING ); } /* otherwise, pass it on */ return read_thyng( tp,offset,localAddress,nbytes,status,NULL,NULL ); } /*-------------------------------------------------------------- */ int readBack_thing( char *source, unsigned long offset, void *localAddress, unsigned nbytes, void (*callback)(void*,void*), void *callback_data ) { thing_t *tp; /* Lets go find it. */ tp= find_thing(source); if ( tp == NULL ) { fprintf(stderr,"Error: Can't find '%s' object to read\n", source ); return -1; } return read_thyng( tp,offset,localAddress,nbytes,NULL,callback,callback_data ); } /*------------------------------------------------------------ * Write_Thing will initiate the writing of a particular thing. * The name of the thing to write is passed in as an argument * along with the size of the thing to write, where to get * the thing in local memory, and a pointer to a location of * where to store the status upon completion of this write. * When a program makes a WRITE request, it will call Write_Thing () * which will take the request and do the following: * - format the IOP structure with all the necessary data * - put the iop structure on the writeInbasket list * - Release the Write_Thingy_ready_hsema semaphore to signal * Write_Thingy () that there is something to do. * * Return values: 0 is queued OK, -1 error, else nbytes transferred. * In the case of the thing to write residing on another node, * (Remote I/O == RIO), this starts a complicated chain of events. * * First, fields are filled in RIOheader, and the local mpi_thingy * is nudged by setting nonzero. Further events are * coordinated between the local mpi_thingy process, the remote * mpi_thingy and the remote write_thing process. This process * simply sleeps until the SEM_RIOWRITEDONE semaphore is released. */ int write_thyng(thing_t *thing, unsigned long offset, void *localAddress, unsigned nbytes, thing_t *status, void (*callback)(void*,void*), void *callback_data ) { thing_t *iopp; /* Pointer to an IOP Thing */ if ( status == NULL ) { iopp= (thing_t*)malloc( sizeof(thing_t) ); *iopp= *thing; iopp->allocated= 2; /* This is a callback notified iop. completion should free this. */ iopp->callback= callback; iopp->callback_data= callback_data; } else { /* This is a simple iop transaction */ iopp= status; *iopp= *thing; iopp->callback= NULL; iopp->allocated= 1; } /* fill in the iop structure */ iopp->offset = offset; iopp->nbytes = nbytes; iopp->localaddress = localAddress; iopp->status = 0; iopp->RIO= 0; iopp->rorw= 1; #if 0 /* If the thing is remote, send a message to the remote node * initiating the transfer. Note a home_node of -1 means the object * is on shared disk and hence 'local'. */ if ( thing->home_node == -1 || thing->home_node == Myid ) { /* Enqueue the iop_thing on the readInbasket queue */ enqueue_iop_thing(iopp,&readWriteBasket); /* Signal read_thingy () that something needs to be done */ t_release( SEM_READWRITE_THINGY ); } else { /* We have a nonlocal write. Signal mpi_thingy to enqueue the remote * write on another node. */ iopp->RIO= 2; iopp->origStatus= iopp; iopp->origNode= Myid; RIOthing= *iopp; SendRIO= 2; while (SendRIO) Sleep(1); /* wait for mpi_thingy to pick it up and send */ /* fprintf(stderr,"non local write of %s.. punted to %d\n", thing->name, thing->home_node ); */ } #else /* Now all things are 'local' but still need the RIO flags set */ if ( thing->home_node != -1 && thing->home_node != Myid ) iopp->RIO= 2; /* Enqueue the iop_thing on the readInbasket queue */ enqueue_iop_thing(iopp,&readWriteBasket); /* Signal read_thingy () that something needs to be done */ t_release( SEM_READWRITE_THINGY ); #endif return 0; } /* end of write_thyng() */ /*------------------------------------------------------------ */ void clearWait_thing( thing_t *id ) { id->complete= 0; id->allocated= 0; } void getIOstatistics_thing( thing_t *id, float *elapsedio, int *nbytes, int *remote ) { *elapsedio= id->elapsedio; *nbytes= id->nbytes_xferred; *remote= id->RIO; } /*------------------------------------------------------------ */ int write_thing(char *source, unsigned long offset, void *localAddress, unsigned nbytes, thing_t *status) { thing_t astatus; int ierr; thing_t *tp; /* Lets go find it. */ tp= find_thing(source); if ( tp== NULL ) { fprintf(stderr,"Error: can't find thing '%s'\n", source ); list_thing(); exit(1); return -1; } /* if we must supply the status, this is a blocking read */ if ( status == NULL ) { ierr= write_thyng(tp,offset,localAddress,nbytes,&astatus,NULL,NULL ); if ( ierr == -1 ) return -1; return wait_thing ( &astatus, TILDONE_THING ); } /* otherwise, pass it on */ return write_thyng( tp,offset,localAddress,nbytes,status,NULL,NULL ); } /*---------------------------------------------------------------- */ int writeBack_thing( char *source, unsigned long offset, void *localAddress, unsigned nbytes, void (*callback)(void*,void*), void *callback_data ) { thing_t *tp; /* Lets go find it. */ tp= find_thing(source); if ( tp == NULL ) { fprintf(stderr,"Error: can't find thing '%s'\n", source ); return -1; } return write_thyng( tp,offset,localAddress,nbytes,NULL,callback,callback_data ); } /*------------------------------------------------------------*/ /* * This routine will check the completion status of a read or * write operation on a specific thing. This routine will return * to the caller under two conditions: (1) if the specified * operation has completed or (2) if the caller requested a * non-blocking wait (waitflag = 0) in which case the current * status is simply returned. * * Return values: A value of 0 (zero) means the operation is * not complete (waitflag must be 0 for this to * occur.) * Otherwise, the lowlevel I/O status code is * returned, normally the number of bytes * transferred. -1 indicates an error. * */ int wait_thing( thing_t *thing, int waitflag) { int returnvalue; unsigned i; if ( thing->allocated < 0 ) { /* Hey! we've already waited on this */ fprintf(stderr,"wait_thing: Redundant wait ignored\n"); fprintf(Log,"wait_thing: Redundant wait ignored\n"); return thing->nbytes_xferred; } #if 0 if ( thing->allocated == 0 && !waitflag) { /* never been used in an I/O operation. */ return 0; } #endif do { if ( thing->complete ) { /* Return completion status and enqueue iop_thing on free list*/ thing->allocated= -thing->allocated; /* indicates we have waited on this */ if ( thing->status == 0 ) returnvalue= thing->nbytes_xferred; else returnvalue= -1; #ifdef DBGRIO fprintf(stderr,"cm %d wait done for thing %x\n", Myid, LOWEST(thing) ); fprintf(Log,"!!!!!!!!!!!!!!!!!!!!!!in wait thing, thing->status is %d, returning status of %d\n", returnvalue, thing->nbytes_xferred ); fflush(Log); #endif if ( thing->allocated == -2 ) free(thing); /* malloc made this, we're done with it. */ return returnvalue; } /* not done; if Non-Blocking wait is requested (waitflag = 0) then just return */ if (waitflag == 0) return(0); sleep(1); #ifdef DBGRIO fprintf(stderr,"cm %d waiting for thing %x\n", Myid, LOWEST(thing) ); #endif /* t_wait( SEM_WAIT_THING );*/ } while ( 1 ); #if 0 /* Otherwise, blocking is requested, then wait for something to finish...*/ t_wait( SEM_WAIT_THING ); #endif } /* end of waitthing() */ /*---------------------------------------------------------------- * An IO has finished - place the op on the return queue or * invoke the callback. */ void returnStatus( thing_t *iopp ) { unsigned i; /* If this is to do a callback, this is the place to do it. */ if ( iopp->callback != NULL ) { iopp->callback( iopp, iopp->callback_data ); } else { iopp->complete= 1; #ifdef DBGRIO if ( iopp->RIO ) fprintf(stderr,"cm %d RIO thing returning %x\n", Myid,LOWEST(iopp) ); else fprintf(stderr,"cm %d local thing returning %x\n", Myid,LOWEST(iopp) ); #endif #if 0 /* Release the Wait_Thing semaphore so that the Wait_Thing starts up */ t_release( SEM_WAIT_THING ); #endif } } /*---------------------------------------------------------------- * Blocking write (in window, using overlapped I/O solely for * the delivered performance). * * Returns -1, or if succesful, the number of bytes * transferred. */ unsigned write_bytes( HANDLE fd, void *buffer, unsigned nbytes, unsigned long offset ) { #ifdef WIN32 OVERLAPPED overlapped; #endif int pwrite(); /* missing from linux includes */ int ierr, result, iok; unsigned long nbytes_written; #ifdef WIN32 /* Overlapped[buf].Offset = (unsigned long)(bigoffset & 0xFFFFFFFFi64); Overlapped[buf].OffsetHigh = (unsigned long)((bigoffset>>32) & 0xFFFFFFFFi64); */ overlapped.Offset= offset; overlapped.OffsetHigh= 0; overlapped.hEvent= NULL; iok= WriteFile( fd, buffer, nbytes, NULL, &overlapped ); /* not in win98?? *nbytes_written & NULL &overlapped ); */ if ( !iok ) { /* fprintf(stderr,"writeFile returned %d\n", iok); */ ierr = GetLastError(); if ( ierr != ERROR_IO_PENDING ) { fprintf(stderr,"Error: write_thingy: writefile FAILED code=%d\n", ierr ); return ierr; } } /* else result= nbytes_written; */ #if 1 /* To use unbuffered I/O (windows only) these restrictions apply: // offsets must be multiples of volume sector size (GetDiskFreeSpace) // access must be byte multiples of the volume sector size // buffer address must be aligned on addresses multiples of sector size // VirtualAlloc will (over)guarantee that, as it returns // page aligned addresses. // Seems like more than its worth for now. */ /* Wait for the write */ iok= GetOverlappedResult( fd, &overlapped, &nbytes_written, TRUE ); if ( !iok ) { result= GetLastError(); fprintf(stderr,"getoverlappedresult FAILED code=%d\n",result); } else { result= nbytes_written; } #endif #else ierr= pwrite( fd, buffer, nbytes, offset ); if ( ierr > 0 ) result = ierr; else result= (unsigned)-1; #endif return result; } /*------------------------------------------------------------ * This is the guts of the write logic, called by readwrite_thingy */ void dowrite0_thingy( thing_t *iopp ) { HANDLE fd; /* unsigned n; */ unsigned ierr; char *cp; /* timers */ unsigned long start, stop; float elapsed; unsigned long offset, memoffset; unsigned nbytes2write, nbytes2fetch, nbytes2go; #ifdef DBGRIO int i; if ( iopp->RIO ) fprintf(stderr,"cm %d entering dowrite_thingy RIO origStatus=%x\n", Myid, LOWEST(iopp->origStatus ) ); /* else fprintf(stderr,"entering dowrite_thingy callback=%p\n",iopp->callback); */ #endif fd= open_thing( iopp->name, 0, 0 ); if ( fd == INVALID_HANDLE_VALUE ) { #ifdef WIN32 iopp->status= GetLastError(); #else iopp->status= errno; #endif iopp->nbytes_xferred= 0; fprintf(stderr,"Error: write_thingy: open_thing '%s' failed\n",iopp->name ); goto nogo; } nbytes2go= iopp->nbytes; /* whole transfer size */ offset= iopp->offset; /* offset measured against thing: destination offset */ memoffset= 0; /* offset measured against memory: source */ nbytes2write= iopp->nbytes; /* defaults for single non-rio transfer */ iopp->nbytes_xferred= 0; #if 0 /* since we're not using overlapped io currently, set initial pos */ n = GetFileSize( fd, NULL ); SetFilePointer( fd, offset, NULL, FILE_BEGIN ); /* since WINDOWS does not automatically extend the size of a file, * perform a null write just in case this is extending it. */ if ( offset>n ) { fprintf(stderr,"Warning: write_thing extending file size, offset %u > thing size= %d\n", offset, n); WriteFile( fd, NULL,0, &ierr, NULL ); } #endif start= getMillisec(); /* For RIO, start fetch first buffer */ if (iopp->RIO) { nbytes2fetch= nbytes2go; if ( nbytes2fetch > MAXRIOSIZE ) nbytes2fetch= MAXRIOSIZE; /* release mpi_thingy to get this data. When its been recvd, * SEM_RIOMPI will be notified */ RIOinfo.sourceAddr= iopp->localaddress; RIOinfo.destAddr= RIObuffer[0]; RIOinfo.nbytes= nbytes2fetch; RIOinfo.node= iopp->origNode; RIOinfo.start= 2; while ( RIOinfo.start ) Sleep(1); } nextbuffer: /* For RIO, wait for current buffer fetch, then start prefetch of next buffer (if any) */ if (iopp->RIO) { t_wait( SEM_RIOMPI ); nbytes2write= nbytes2fetch; /* flip buffers, the MPI data is ready in buffer 0 before this point */ cp= RIObuffer[0]; RIObuffer[0]= RIObuffer[1]; RIObuffer[1]= cp; nbytes2fetch= (nbytes2go-nbytes2fetch); if ( nbytes2fetch > MAXRIOSIZE ) nbytes2fetch= MAXRIOSIZE; /* fprintf(stderr,"got sem_riompi, nbytes2fetch=%d \n", nbytes2fetch ); */ /* release mpi_thingy to get this data. */ if ( nbytes2fetch ) { memoffset += RIOinfo.nbytes; RIOinfo.sourceAddr= (char*)(iopp->localaddress) + memoffset; RIOinfo.destAddr= RIObuffer[0]; RIOinfo.nbytes= nbytes2fetch; RIOinfo.node= iopp->origNode; RIOinfo.start= 2; while (RIOinfo.start) Sleep(1); } } /* fprintf(stderr," @nextbuffer nbytes2write=%d, nbytes2go=%d\n", nbytes2write, nbytes2go ); */ if (iopp->RIO) ierr= write_bytes( fd, RIObuffer[1], nbytes2write, offset ); else ierr= write_bytes( fd, iopp->localaddress, nbytes2write, offset ); if ( ierr != nbytes2write ) { fprintf(stderr,"write_bytes returned %d not %d\n", ierr, nbytes2write ); iopp->status= -1; goto nogo; } else { /* fprintf(stderr,"write_bytes returned correct %d bytes\n", nbytes2write ); */ iopp->nbytes_xferred+= ierr; iopp->status= 0; } offset += nbytes2write; nbytes2go -= nbytes2write; if ( nbytes2go ) goto nextbuffer; stop= getMillisec(); if ( stop == start ) stop++; /* ? just in case */ stop -= start; elapsed= ((float)stop) * 0.001f; iopp->elapsedio= elapsed; /* iopp->mbytesec= ((float)(iopp->nbytes_xferred)/(1024.0f*1024.0f)) / elapsed; */ #if 0 fdatasync(fd); #endif CLOSEFILE(fd); nogo: if ( iopp->RIO ) { _ASSERTE( iopp->allocated == 2 ); /* Send this back to the originator */ RIOstatus= *iopp; free(iopp); #ifdef DBGRIO fprintf(stderr,"cm %d sending RIO status for thing %x back to %d\n", Myid, LOWEST(RIOstatus.origStatus), RIOstatus.origNode); #endif SendRIOstatus= 1; /* RIOinfo.start= 3; while (RIOinfo.start) Sleep(1); */ while (SendRIOstatus) Sleep(1); } else returnStatus( iopp ); } /* end of dowrite0_thingy() */ /*------------------------------------------------------------ * This is the guts of the write logic, called by readwrite_thingy * It now relies on rio functions for remote i/o. */ void dowrite_thingy( thing_t *iopp ) { HANDLE fd; /* unsigned n; */ unsigned ierr; char *cp; char name[THING_NAME_LEN]; /* timers */ unsigned long start, stop; float elapsed; unsigned long offset, memoffset; unsigned nbytes2write, nbytes2fetch, nbytes2go; int i; #if 0 if ( iopp->RIO ) fprintf(Log,"%s entering dowrite_thingy RIO for '%s'\n",timestamp(),iopp->name); else fprintf(Log,"%s entering doread_thingy for '%s'\n",timestamp(),iopp->name); /*if ( iopp->RIO ) fprintf(stderr,"cm %d entering dowrite_thingy RIO origStatus=%x\n", Myid, LOWEST(iopp->origStatus ) ); */ /* else fprintf(stderr,"entering dowrite_thingy callback=%p\n",iopp->callback); */ #endif start= getMillisec(); if ( iopp->RIO ) { /* compose name as root directory name + this name */ strcpy(name,RootDirName); strcat(name,iopp->name); /* fprintf(stderr, "rio write node %d '%s' name='%s'\n",iopp->home_node, Hostnames[iopp->home_node], name ); */ iopp->nbytes_xferred= rio_writeh( Hostnames[iopp->home_node], name, iopp->localaddress, iopp->nbytes, iopp->offset ); /* fprintf(stderr,"rio write status=%d\n", iopp->nbytes_xferred ); */ } else { fd= open_thing( iopp->name, 0, 0 ); if ( fd == INVALID_HANDLE_VALUE ) { #ifdef WIN32 iopp->status= GetLastError(); #else iopp->status= errno; #endif iopp->nbytes_xferred= 0; fprintf(stderr,"Error: write_thingy: open_thing '%s' failed\n",iopp->name ); goto nogo; } nbytes2go= iopp->nbytes; /* whole transfer size */ offset= iopp->offset; /* offset measured against thing: destination offset */ memoffset= 0; /* offset measured against memory: source */ nbytes2write= iopp->nbytes; /* defaults for single non-rio transfer */ iopp->nbytes_xferred= 0; #if 0 /* since we're not using overlapped io currently, set initial pos */ n = GetFileSize( fd, NULL ); SetFilePointer( fd, offset, NULL, FILE_BEGIN ); /* since WINDOWS does not automatically extend the size of a file, * perform a null write just in case this is extending it. */ if ( offset>n ) { fprintf(stderr,"Warning: write_thing extending file size, offset %u > thing size= %d\n", offset, n); WriteFile( fd, NULL,0, &ierr, NULL ); } #endif nextbuffer: ierr= write_bytes( fd, iopp->localaddress, nbytes2write, offset ); if ( ierr != nbytes2write ) { fprintf(stderr,"write_bytes returned %d not %d\n", ierr, nbytes2write ); iopp->status= -1; CLOSEFILE(fd); goto nogo; } else { /* fprintf(stderr,"write_bytes returned correct %d bytes\n", nbytes2write ); */ iopp->nbytes_xferred+= ierr; iopp->status= 0; } offset += nbytes2write; nbytes2go -= nbytes2write; if ( nbytes2go ) goto nextbuffer; #if 0 fdatasync(fd); #endif CLOSEFILE(fd); } stop= getMillisec(); if ( stop == start ) stop++; /* ? just in case */ stop -= start; elapsed= ((float)stop) * 0.001f; iopp->elapsedio= elapsed; /* iopp->mbytesec= ((float)(iopp->nbytes_xferred)/(1024.0f*1024.0f)) / elapsed; */ nogo: returnStatus( iopp ); } /* end of dowrite_thingy() */ /*---------------------------------------------------------------- * Blocking read (in windows, using overlapped I/O solely for * the delivered performance). * * Returns -1, or in the unlikely event of success, the number of bytes * transferred. */ unsigned read_bytes( HANDLE fd, void *buffer, unsigned nbytes, unsigned long offset ) { #ifdef WIN32 OVERLAPPED overlapped; #endif int pread(); /* missing from linux */ int ierr, iok; unsigned result, nbytes_xferred; #ifdef WIN32 /* Overlapped[buf].Offset = (unsigned long)(bigoffset & 0xFFFFFFFFi64); Overlapped[buf].OffsetHigh = (unsigned long)((bigoffset>>32) & 0xFFFFFFFFi64); */ overlapped.Offset= offset; overlapped.OffsetHigh= 0; overlapped.hEvent= NULL; iok= ReadFile( fd, buffer, nbytes, NULL, &overlapped ); /* not in win98 , &nbytes_xferred, NULL ); */ if ( !iok ) { ierr = GetLastError(); if ( ierr != ERROR_IO_PENDING ) { fprintf(stderr,"Error: read_thingy: readfile FAILED code=%d\n", ierr ); return ierr; } } /* else result= nbytes_xferred; */ #if 1 /* Wait for the read */ iok= GetOverlappedResult( fd, &overlapped, &nbytes_xferred, TRUE ); if ( !iok ) { ierr= GetLastError(); /* fprintf(stderr,"getoverlappedresult FAILED code=%d\n",ierr); */ } else { result= nbytes_xferred; } #endif #else #if 0 fprintf(Log,"about to read %d bytes @%d to %p\n", nbytes, offset, buffer ); fprintf(Log,"about to read %d bytes @%d to %p\n", nbytes, offset, buffer ); fflush(Log); #endif lseek( fd, offset, SEEK_SET ); nbytes_xferred= read( fd, buffer, nbytes ); #if 0 fprintf(Log,"read %d bytes @%d to %p\n", nbytes, offset, buffer ); fflush(Log); #endif #if 0 nbytes_xferred= pread( fd, buffer, nbytes, offset ); #endif if ( nbytes_xferred == -1 ) result = -1; else result = nbytes_xferred; /* fprintf(Log," with result %d\n", result ); */ #endif return result; } /*----------------------------------------------------------------- * Guts of the read for readwrite_thingy */ void doread0_thingy( thing_t *iopp ) { HANDLE fd; /* unsigned n; */ unsigned ierr; char *cp; /* timers */ unsigned long start, stop; float elapsed; unsigned nbytes2read, memoffset, nbytes2go; unsigned long offset; #ifdef DBGRIO if ( iopp->RIO ) fprintf(stderr,"entering doread_thingy RIO\n"); else fprintf(stderr,"entering doread_thingy\n"); #endif fd= open_thing( iopp->name, 0, 0 ); if ( fd == INVALID_HANDLE_VALUE ) { /* return error in iopp result */ #ifdef WIN32 iopp->status= GetLastError(); #else iopp->status= errno; #endif iopp->nbytes_xferred= 0; goto nogo; } nbytes2go= iopp->nbytes; /* whole transfer size */ offset= iopp->offset; /* offset measured against thing: source offset */ memoffset= 0; /* offset measured against memory: destination */ nbytes2read= nbytes2go; /* The only piecewise reads are RIO */ if ( iopp->RIO && nbytes2read > MAXRIOSIZE ) nbytes2read= MAXRIOSIZE; iopp->nbytes_xferred= 0; #if 0 n = GetFileSize( fd, NULL ); if ( offset > n ) { fprintf(stderr,"Error: readh_thingy offset %d > thing size of %d\n", offset, n ); iopp->status= -1; CloseHandle(fd); goto nogo; } /* since we're not using overlapped io currently, set initial pos */ SetFilePointer( fd, offset, NULL, FILE_BEGIN ); #endif start = getMillisec(); if ( iopp->RIO ) ierr= read_bytes( fd, RIObuffer[0], nbytes2read, offset ); else ierr= read_bytes( fd, iopp->localaddress, nbytes2read, offset ); if ( ierr != nbytes2read ) { fprintf(stderr,"read bytes returned %d not %d\n", ierr, nbytes2read ); iopp->status = -1; } else { iopp->status= 0; iopp->nbytes_xferred += ierr; } if ( iopp->RIO ) { cp= RIObuffer[0]; RIObuffer[0]= RIObuffer[1]; RIObuffer[1]= cp; } nextbuffer: #if 0 fprintf(stderr," @nextbuffer nbytes2go=%d, nbytes2read=%d\n", nbytes2go, nbytes2read ); #endif /* start send of current buffer */ if ( iopp->RIO ) { RIOinfo.sourceAddr= RIObuffer[1]; RIOinfo.destAddr= (char*)iopp->localaddress + memoffset; RIOinfo.nbytes= nbytes2read; RIOinfo.node= iopp->origNode; RIOinfo.start= 1; while ( RIOinfo.start ) Sleep(1); memoffset += nbytes2read; } nbytes2go -= nbytes2read; offset += nbytes2read; /* read next buffer, if any */ nbytes2read= nbytes2go; if ( nbytes2read > MAXRIOSIZE ) nbytes2read= MAXRIOSIZE; if ( iopp->RIO ) { if ( nbytes2read > 0 ) { ierr= read_bytes( fd, RIObuffer[0], nbytes2read, offset ); if ( ierr != nbytes2read ) { fprintf(stderr,"read_bytes returned %d not %d\n", ierr, nbytes2read ); iopp->status= -1; } else { iopp->status= 0; iopp->nbytes_xferred+= ierr; } cp= RIObuffer[0]; RIObuffer[0]= RIObuffer[1]; RIObuffer[1]= cp; } /* wait for completion of MPI send */ t_wait( SEM_RIOMPI ); } if ( nbytes2go ) goto nextbuffer; #if 0 fdatasync(fd); #endif CLOSEFILE(fd); stop= getMillisec(); if ( stop == start ) stop++; /* ? just in case */ stop -= start; elapsed= ((float)stop) * 0.001f; iopp->elapsedio= elapsed; /* iopp->mbytesec= ((float)(iopp->nbytes_xferred)/(1024.0f*1024.0f)) / elapsed; */ /*fprintf(stderr," start %u stop %u elapsed %f \n", start,stop, elapsed ); fprintf(stderr,"read_thingy: oooo - read %d bytes from %s\n",iopp->status,iopp->name); */ nogo: if ( iopp->RIO ) { /* Send this back to the originator */ RIOstatus= *iopp; _ASSERTE( iopp->allocated == 2 ); free(iopp); SendRIOstatus= 1; while (SendRIOstatus) Sleep(1); /* RIOinfo.start= 3; while (RIOinfo.start) Sleep(1); */ } else returnStatus( iopp ); } /*----------------------------------------------------------------- * Guts of the read for readwrite_thingy, modified to rely on rio */ void doread_thingy( thing_t *iopp ) { HANDLE fd; /* unsigned n; */ unsigned ierr; char *cp; char name[THING_NAME_LEN]; /* timers */ unsigned long start, stop; float elapsed; unsigned nbytes2read, memoffset, nbytes2go; unsigned long offset; #if 0 if ( iopp->RIO ) fprintf(Log,"%s entering doread_thingy RIO for '%s'\n",timestamp(),iopp->name); else fprintf(Log,"%s entering doread_thingy for '%s'\n",timestamp(),iopp->name); #endif start = getMillisec(); if ( iopp->RIO ) { /* compose name as root directory name + this name */ strcpy(name,RootDirName); strcat(name,iopp->name); /* fprintf(stderr, "rio read node %d '%s' name='%s'\n",iopp->home_node, Hostnames[iopp->home_node], name ); */ iopp->nbytes_xferred= rio_readh( Hostnames[iopp->home_node],name, iopp->localaddress, iopp->nbytes, iopp->offset ); } else { fd= open_thing( iopp->name, 0, 0 ); if ( fd == INVALID_HANDLE_VALUE ) { /* return error in iopp result */ fprintf(Log,"open_thing returned Error %d %s\n", fd, strerror(errno) ); #ifdef WIN32 iopp->status= GetLastError(); #else iopp->status= errno; #endif iopp->nbytes_xferred= 0; CLOSEFILE(fd); goto nogo; } nbytes2go= iopp->nbytes; /* whole transfer size */ offset= iopp->offset; /* offset measured against thing: source offset */ memoffset= 0; /* offset measured against memory: destination */ /* fprintf(Log,"reading %d bytes\n", nbytes2go); */ nbytes2read= nbytes2go; /* The only piecewise reads are RIO */ if ( iopp->RIO && nbytes2read > MAXRIOSIZE ) nbytes2read= MAXRIOSIZE; iopp->nbytes_xferred= 0; #if 0 n = GetFileSize( fd, NULL ); if ( offset > n ) { fprintf(stderr,"Error: readh_thingy offset %d > thing size of %d\n", offset, n ); iopp->status= -1; CloseHandle(fd); goto nogo; } /* since we're not using overlapped io currently, set initial pos */ SetFilePointer( fd, offset, NULL, FILE_BEGIN ); #endif ierr= read_bytes( fd, iopp->localaddress, nbytes2read, offset ); /* fprintf(Log,"read bytes returned %d trying for %d\n", ierr, nbytes2read ); */ if ( ierr != nbytes2read ) { iopp->status = -1; } else { iopp->status= 0; iopp->nbytes_xferred += ierr; } nextbuffer: /* fprintf(Log," @nextbuffer nbytes2go=%d, nbytes2read=%d\n", nbytes2go, nbytes2read ); */ /* start send of current buffer */ nbytes2go -= nbytes2read; offset += nbytes2read; /* read next buffer, if any */ nbytes2read= nbytes2go; if ( nbytes2read > MAXRIOSIZE ) nbytes2read= MAXRIOSIZE; if ( nbytes2go ) goto nextbuffer; #if 0 fdatasync(fd); #endif CLOSEFILE(fd); } stop= getMillisec(); if ( stop == start ) stop++; /* ? just in case */ stop -= start; elapsed= ((float)stop) * 0.001f; iopp->elapsedio= elapsed; /* iopp->mbytesec= ((float)(iopp->nbytes_xferred)/(1024.0f*1024.0f)) / elapsed; */ /* fprintf(Log," start %u stop %u elapsed %f \n", start,stop, elapsed ); fprintf(Log,"read_thingy: oooo - read %d bytes from %s\n",iopp->nbytes_xferred,iopp->name); */ nogo: if ( Log!=NULL) fprintf(Log,"%s nbytes_xferred=%d\n", timestamp(),iopp->nbytes_xferred ); returnStatus( iopp ); fflush(Log); } /*------------------------------------------------------------ * This routine is the io daemon that sits in the background * and actually proceses read/write requests as they appear in the * readwriteInbasket. When something is put into the Inbasket, * a semaphore is updated and this process is released to do its * worst. */ DWORD WINAPI readwrite_thingy(void) { thing_t *iopp; /* Pointer to an IOP Thing */ readwrite_thingy_state = THING_RUNNING; /* wait for read requests */ while(readwrite_thingy_state == THING_RUNNING) { t_wait( SEM_READWRITE_THINGY ); if ( readwrite_thingy_state != THING_RUNNING ) break; /* Now that we have something to do, pardon the pun.... */ /* Get the next iop_thing from the readInbasket queue */ iopp = dequeue_iop_thing(&readWriteBasket,NULL); /* check to see that we really have something to do */ if (iopp == NULL ) { /* should never ever happen */ /* fprintf(stderr,"Error: readwrite_thingy: Nothing in the Inbasket!\n"); */ } else { iopp->complete= 0; if ( iopp->rorw == 0) doread_thingy(iopp); else dowrite_thingy(iopp); } } readwrite_thingy_state= THING_STOPPED; return(0); } /* end of read_thingy() */ /*------------------------------------------------------------*/ /* * This function will put an iopthing on the specified queue. * * Return values: none * */ void enqueue_iop_thing( thing_t *iopp, iop_queue_t *qp) { _ASSERTE( iopp != NULL ); /* doh! */ t_lock( qp->mutex ) ; /* Get the queue lock */ /* Put this request in the queue */ if (qp->last != NULL) { /* Tack this iopp to the end of the list */ qp->last->next = iopp; iopp->next = NULL; iopp->prev = qp->last; } else { /* This is now the first and only iopp on the list */ qp->first = iopp; iopp->next = NULL; iopp->prev = NULL; } qp->last = iopp; t_unlock( qp->mutex ); return; /* success */ } /* end of enqueue_iop_thing() */ /*------------------------------------------------------------*/ /* * This function will get an iopthing from the specified queue. * The first variable passed into this function is the pointer to * a specific iop_thing that is to be dequeued from the queue. If this * parameter is NULL then the first iop_thing on the queue is * detached and returned to the caller. Otherwise, a search for * the specific iop_thing is performed and that iop_thing is * removed from the specified queue and the pointer to that iop_thing * is returned to the caller. If the specified iop_thing is not * found or if there are no iop_things on the queue then a NULL * pointer is returned. * * Return values: pointer to an iop_thing or NULL is none was found. * */ thing_t* dequeue_iop_thing(iop_queue_t *qp, thing_t *ip) { thing_t *iopp; /* pointer to the iop_thing */ /* Get the queue lock */ t_lock( qp->mutex ); if (ip != NULL ) { /* locate this iop_thing on the queue and remove it */ /* scan the queue until a match is found or the end of the queue */ iopp = qp->first; while (iopp!=NULL && ip!=iopp) iopp= iopp->next; if (iopp!=NULL) { /* Found it. Detatch it */ /* update queue pointers? */ if ( iopp == qp->first ) { qp->first = iopp->next; /* fprintf(stderr,"dequeue_iop_thing: updating qp->first to %p\n", qp->first);*/ if ( qp->first == NULL ) qp->last= NULL; } else if ( iopp == qp->last ) qp->last = iopp->prev; if(iopp->prev != NULL ) /* previous one points to the next one */ iopp->prev->next = iopp->next; if(iopp->next != NULL ) /* next one points to the previous one */ iopp->next->prev = iopp->prev; iopp->next = NULL; /* clear out pointers in the removed one */ iopp->prev = NULL; } } else { /* remove the first thing off the queue */ iopp = qp->first; if (iopp != NULL) { /* Reconnect the list */ qp->first = iopp->next; iopp->next= NULL; } if ( qp->first == NULL ) qp->last= NULL; } t_unlock( qp->mutex ); return iopp ; } /* end of dequeue_iop_thing() */ /*------------------------------------------------------------*/ /* * This function will puts a work request on the queue (presumably * the master's). * * Return values: none * */ void enqueue_work_thing( work_item_t *work, work_queue_t *qp) { /* Get the queue lock */ t_lock( qp->mutex ); /* Put this request in the queue */ if (qp->last != NULL) { /* Tack this to the end of the list */ qp->last->next = work; work->next = NULL; work->prev = qp->last; } else { /* is now the first and only on the list */ qp->first = work; work->next = NULL; work->prev = NULL; } qp->last= work; /* Release the queue lock */ t_unlock( qp->mutex ); return; } /* end of enqueue_work_thing() */ /*------------------------------------------------------------ * Return the 'top' element off the work queue, or null if the * queue is empty. */ work_item_t * dequeue_work_thing(work_queue_t *qp) { work_item_t *work; /* Get the queue lock */ t_lock( qp->mutex ); work= qp->first; /* Empty list? */ if ( work == NULL ) { t_unlock( qp->mutex ); return work; } /* snap off the first element - update back pointer of the * list top */ qp->first= work->next; if ( qp->first != NULL ) { qp->first->prev= NULL; } else /* if this was the last one, clear the end pointer */ qp->last= NULL; /* Release the queue lock */ t_unlock( qp->mutex ); return(work); } /* end of dequeue_work_thing() */