/[MITgcm]/MITgcm_contrib/llc_hires/llc_1080/code-async/recvTask.c
ViewVC logotype

Annotation of /MITgcm_contrib/llc_hires/llc_1080/code-async/recvTask.c

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.1 - (hide annotations) (download)
Sun Sep 29 22:43:01 2013 UTC (11 years, 10 months ago) by dimitri
Branch: MAIN
File MIME type: text/plain
adding code-async for llc1080

1 dimitri 1.1
2     // Code to do the m-on-n fan-in, recompositing, and output, of data
3     // tiles for mitGCM.
4     //
5     // There are aspects of this code that would be simpler in C++, but
6     // we deliberately wrote the code in ansi-C to make linking it with
7     // the Fortran main code easier (should be able to just include the
8     // .o on the link line).
9    
10    
11     #include "PACKAGES_CONFIG.h"
12    
13     #include <stdio.h>
14     #include <unistd.h>
15     #include <stdlib.h>
16     #include <string.h>
17     #include <sys/types.h>
18     #include <sys/stat.h>
19     #include <fcntl.h>
20    
21     #include <mpi.h>
22     #include <alloca.h>
23    
24    
25     #include <stdint.h>
26     #include <errno.h>
27    
28     #define DEBUG 1
29    
30     #if (DEBUG >= 2)
31     #define FPRINTF fprintf
32     #else
33     #include <stdarg.h>
34     void FPRINTF(FILE *fp,...){return;}
35     #endif
36    
37    
38     // Define our own version of "assert", where we sleep rather than abort.
39     // This makes it easier to attach the debugger.
40     #if (DEBUG >= 1)
41     #define ASSERT(_expr) \
42     if (!(_expr)) { \
43     fprintf(stderr, "ASSERT failed for pid %d : `%s': %s: %d: %s\n", \
44     getpid(), #_expr, __FILE__, __LINE__, __func__); \
45     sleep(9999); \
46     }
47     #else
48     #define ASSERT assert
49     #endif
50    
51    
52    
53     // If numRanksPerNode is set to be > 0, we just use that setting.
54     // If it is <= 0, we dynamically determine the number of cores on
55     // a node, and use that. (N.B.: we assume *all* nodes being used in
56     // the run have the *same* number of cores.)
57     int numRanksPerNode = 0;
58    
59     // Just an error check; can be zero (if you're confident it's all correct).
60     #define numCheckBits 2
61     // This bitMask definition works for 2's complement
62     #define bitMask ((1 << numCheckBits) - 1)
63    
64    
65     ///////////////////////////////////////////////////////////////////////
66     // Info about the data fields
67    
68     // double for now. Might also be float someday
69     #define datumType double
70     #define datumSize (sizeof(datumType))
71    
72    
73     // Info about the data fields. We assume that all the fields have the
74     // same X and Y characteristics, but may have either 1 or NUM_Z levels.
75     //
76     // the following 3 values are required here. Could be sent over or read in
77     // with some rearrangement in init routines
78     //
79     #define NUM_X 1080
80     #define NUM_Y 14040L // get rid of this someday
81     #define NUM_Z 90
82     #define MULTDIM 7
83     #define twoDFieldSizeInBytes (NUM_X * NUM_Y * 1 * datumSize)
84     #define threeDFieldSizeInBytes (twoDFieldSizeInBytes * NUM_Z)
85     #define multDFieldSizeInBytes (twoDFieldSizeInBytes * MULTDIM)
86    
87     // Info about the data tiles. We assume that all the tiles are the same
88     // size (no odd-sized last piece), they all have the same X and Y
89     // characteristics (including ghosting), and are full depth in Z
90     // (either 1 or NUM_Z as appropriate).
91     //
92     // all these data now sent over from compute ranks
93     //
94     int TILE_X = -1;
95     int TILE_Y = -1;
96     int XGHOSTS = -1;
97     int YGHOSTS = -1;
98    
99     // Size of one Z level of a tile (NOT one Z level of a whole field)
100     int tileOneZLevelItemCount = -1;
101     int tileOneZLevelSizeInBytes = -1;
102    
103    
104     typedef struct dataFieldDepth {
105     char dataFieldID;
106     int numZ;
107     } dataFieldDepth_t;
108    
109     dataFieldDepth_t fieldDepths[] = {
110     { 'u', NUM_Z },
111     { 'v', NUM_Z },
112     { 'w', NUM_Z },
113     { 't', NUM_Z },
114     { 's', NUM_Z },
115     { 'x', NUM_Z },
116     { 'y', NUM_Z },
117     { 'n', 1 },
118     { 'd', 1 },
119     { 'h', 1 },
120     { 'a', MULTDIM }, // seaice, 7 == MULTDIM in SEAICE_SIZE.h
121     { 'b', 1 },
122     { 'c', 1 },
123     { 'd', 1 },
124     { 'e', 1 },
125     { 'f', 1 },
126     { 'g', 1 },
127     };
128     #define numAllFields (sizeof(fieldDepths)/sizeof(dataFieldDepth_t))
129    
130    
131    
132     ///////////////////////////////////////////////////////////////////////
133     // Info about the various kinds of i/o epochs
134     typedef struct epochFieldInfo {
135     char dataFieldID;
136     MPI_Comm registrationIntercomm;
137     MPI_Comm dataIntercomm;
138     MPI_Comm ioRanksIntracomm;
139     int tileCount;
140     int zDepth; // duplicates the fieldDepth entry; filled in automatically
141     char filenameTemplate[128];
142     long int offset;
143     int pickup;
144     } fieldInfoThisEpoch_t;
145    
146     // The normal i/o dump
147     fieldInfoThisEpoch_t fieldsForEpochStyle_0[] = {
148     { 'u', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "U.%010d.%s", 0, 0 },
149     { 'v', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "V.%010d.%s", 0, 0 },
150     { 't', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "T.%010d.%s", 0,0 },
151     { 'n', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Eta.%010d.%s", 0,0 },
152     {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0,0 },
153     };
154    
155    
156     // pickup file
157     fieldInfoThisEpoch_t fieldsForEpochStyle_1[] = {
158     { 'u', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 1},
159     { 'v', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 1},
160     { 't', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 2 + twoDFieldSizeInBytes * 0, 1},
161     { 's', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 3 + twoDFieldSizeInBytes * 0, 1},
162     { 'x', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 4 + twoDFieldSizeInBytes * 0, 1},
163     { 'y', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 5 + twoDFieldSizeInBytes * 0, 1},
164     { 'n', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 0, 1},
165     { 'd', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 1, 1},
166     { 'h', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 2, 1},
167     {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 ,1},
168     };
169    
170    
171     // seaice pickup
172     fieldInfoThisEpoch_t fieldsForEpochStyle_2[] = {
173     { 'a', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 2},
174     { 'b', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 2},
175     { 'c', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 1, 2},
176     { 'd', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 2, 2},
177     { 'g', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 3, 2},
178     { 'e', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 4, 2},
179     { 'f', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 5, 2},
180     {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 },
181     };
182    
183    
184     fieldInfoThisEpoch_t *epochStyles[] = {
185     fieldsForEpochStyle_0,
186     fieldsForEpochStyle_1,
187     fieldsForEpochStyle_2,
188     };
189     int numEpochStyles = (sizeof(epochStyles) / sizeof(fieldInfoThisEpoch_t *));
190    
191    
192     typedef enum {
193     cmd_illegal,
194     cmd_newEpoch,
195     cmd_epochComplete,
196     cmd_exit,
197     } epochCmd_t;
198    
199    
200     ///////////////////////////////////////////////////////////////////////
201    
202     // Note that a rank will only have access to one of the Intracomms,
203     // but all ranks will define the Intercomm.
204     MPI_Comm ioIntracomm = MPI_COMM_NULL;
205     int ioIntracommRank = -1;
206     MPI_Comm computeIntracomm = MPI_COMM_NULL;
207     int computeIntracommRank = -1;
208     MPI_Comm globalIntercomm = MPI_COMM_NULL;
209    
210    
211     #define divCeil(_x,_y) (((_x) + ((_y) - 1)) / (_y))
212     #define roundUp(_x,_y) (divCeil((_x),(_y)) * (_y))
213    
214    
215    
216     typedef enum {
217     bufState_illegal,
218     bufState_Free,
219     bufState_InUse,
220     } bufState_t;
221    
222    
223     typedef struct buf_header{
224     struct buf_header *next;
225     bufState_t state;
226     int requestsArraySize;
227     MPI_Request *requests;
228     uint64_t ck0;
229     char *payload; // [tileOneZLevelSizeInBytes * NUM_Z]; // Max payload size
230     uint64_t ck1; // now rec'vd from compute ranks & dynamically allocated in
231     } bufHdr_t; // allocateTileBufs
232    
233    
234     bufHdr_t *freeTileBufs_ptr = NULL;
235     bufHdr_t *inUseTileBufs_ptr = NULL;
236    
237     int maxTagValue = -1;
238     int totalNumTiles = -1;
239    
240     // routine to convert double to float during memcpy
241     // need to get byteswapping in here as well
242     memcpy_r8_2_r4 (float *f, double *d, long long *n)
243     {
244     long long i, rem;
245     rem = *n%16LL;
246     for (i = 0; i < rem; i++) {
247     f [i] = d [i];
248     }
249     for (i = rem; i < *n; i += 16) {
250     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 10" : : "m" (d [i + 256 + 0]) );
251     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 11" : : "m" (f [i + 256 + 0]) );
252     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm0 # memcpy_r8_2_r4.c 12" : : "m" (d [i + 0]) : "%xmm0");
253     __asm__ __volatile__ ("movss %%xmm0, %0 # memcpy_r8_2_r4.c 13" : "=m" (f [i + 0]) : : "memory");
254     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm1 # memcpy_r8_2_r4.c 14" : : "m" (d [i + 1]) : "%xmm1");
255     __asm__ __volatile__ ("movss %%xmm1, %0 # memcpy_r8_2_r4.c 15" : "=m" (f [i + 1]) : : "memory");
256     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm2 # memcpy_r8_2_r4.c 16" : : "m" (d [i + 2]) : "%xmm2");
257     __asm__ __volatile__ ("movss %%xmm2, %0 # memcpy_r8_2_r4.c 17" : "=m" (f [i + 2]) : : "memory");
258     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm3 # memcpy_r8_2_r4.c 18" : : "m" (d [i + 3]) : "%xmm3");
259     __asm__ __volatile__ ("movss %%xmm3, %0 # memcpy_r8_2_r4.c 19" : "=m" (f [i + 3]) : : "memory");
260     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm4 # memcpy_r8_2_r4.c 20" : : "m" (d [i + 4]) : "%xmm4");
261     __asm__ __volatile__ ("movss %%xmm4, %0 # memcpy_r8_2_r4.c 21" : "=m" (f [i + 4]) : : "memory");
262     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm5 # memcpy_r8_2_r4.c 22" : : "m" (d [i + 5]) : "%xmm5");
263     __asm__ __volatile__ ("movss %%xmm5, %0 # memcpy_r8_2_r4.c 23" : "=m" (f [i + 5]) : : "memory");
264     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm6 # memcpy_r8_2_r4.c 24" : : "m" (d [i + 6]) : "%xmm6");
265     __asm__ __volatile__ ("movss %%xmm6, %0 # memcpy_r8_2_r4.c 25" : "=m" (f [i + 6]) : : "memory");
266     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm7 # memcpy_r8_2_r4.c 26" : : "m" (d [i + 7]) : "%xmm7");
267     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 27" : : "m" (d [i + 256 + 8 + 0]) );
268     __asm__ __volatile__ ("movss %%xmm7, %0 # memcpy_r8_2_r4.c 28" : "=m" (f [i + 7]) : : "memory");
269     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm8 # memcpy_r8_2_r4.c 29" : : "m" (d [i + 8]) : "%xmm8");
270     __asm__ __volatile__ ("movss %%xmm8, %0 # memcpy_r8_2_r4.c 30" : "=m" (f [i + 8]) : : "memory");
271     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm9 # memcpy_r8_2_r4.c 31" : : "m" (d [i + 9]) : "%xmm9");
272     __asm__ __volatile__ ("movss %%xmm9, %0 # memcpy_r8_2_r4.c 32" : "=m" (f [i + 9]) : : "memory");
273     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm10 # memcpy_r8_2_r4.c 33" : : "m" (d [i + 10]) : "%xmm10");
274     __asm__ __volatile__ ("movss %%xmm10, %0 # memcpy_r8_2_r4.c 34" : "=m" (f [i + 10]) : : "memory");
275     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm11 # memcpy_r8_2_r4.c 35" : : "m" (d [i + 11]) : "%xmm11");
276     __asm__ __volatile__ ("movss %%xmm11, %0 # memcpy_r8_2_r4.c 36" : "=m" (f [i + 11]) : : "memory");
277     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm12 # memcpy_r8_2_r4.c 37" : : "m" (d [i + 12]) : "%xmm12");
278     __asm__ __volatile__ ("movss %%xmm12, %0 # memcpy_r8_2_r4.c 38" : "=m" (f [i + 12]) : : "memory");
279     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm13 # memcpy_r8_2_r4.c 39" : : "m" (d [i + 13]) : "%xmm13");
280     __asm__ __volatile__ ("movss %%xmm13, %0 # memcpy_r8_2_r4.c 40" : "=m" (f [i + 13]) : : "memory");
281     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm14 # memcpy_r8_2_r4.c 41" : : "m" (d [i + 14]) : "%xmm14");
282     __asm__ __volatile__ ("movss %%xmm14, %0 # memcpy_r8_2_r4.c 42" : "=m" (f [i + 14]) : : "memory");
283     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm15 # memcpy_r8_2_r4.c 43" : : "m" (d [i + 15]) : "%xmm15");
284     __asm__ __volatile__ ("movss %%xmm15, %0 # memcpy_r8_2_r4.c 44" : "=m" (f [i + 15]) : : "memory");
285     }
286     }
287    
288     // Debug routine
289     countBufs(int nbufs)
290     {
291     int nInUse, nFree;
292     bufHdr_t *bufPtr;
293     static int target = -1;
294    
295     if (-1 == target) {
296     ASSERT(-1 != nbufs);
297     target = nbufs;
298     }
299    
300     bufPtr = freeTileBufs_ptr;
301     for (nFree = 0; bufPtr != NULL; bufPtr = bufPtr->next) nFree += 1;
302    
303     bufPtr = inUseTileBufs_ptr;
304     for (nInUse = 0; bufPtr != NULL; bufPtr = bufPtr->next) nInUse += 1;
305    
306     if (nInUse + nFree != target) {
307     fprintf(stderr, "Rank %d: bad number of buffs: free %d, inUse %d, should be %d\n",
308     ioIntracommRank, nFree, nInUse, target);
309     sleep(5000);
310     }
311     }
312    
313     int readn(int fd, void *p, int nbytes)
314     {
315    
316     char *ptr = (char*)(p);
317    
318     int nleft, nread;
319    
320     nleft = nbytes;
321     while (nleft > 0){
322     nread = read(fd, ptr, nleft);
323     if (nread < 0)
324     return(nread); // error
325     else if (nread == 0)
326     break; // EOF
327    
328     nleft -= nread;
329     ptr += nread;
330     }
331     return (nbytes - nleft);
332     }
333    
334    
335     ssize_t writen(int fd, void *p, size_t nbytes)
336     {
337     char *ptr = (char*)(p);
338    
339     size_t nleft;
340     ssize_t nwritten;
341    
342     nleft = nbytes;
343     while (nleft > 0){
344     nwritten = write(fd, ptr, nleft);
345     if (nwritten <= 0){
346     if (errno==EINTR) continue; // POSIX, not SVr4
347     return(nwritten); // non-EINTR error
348     }
349    
350     nleft -= nwritten;
351     ptr += nwritten;
352     }
353     return(nbytes - nleft);
354     }
355    
356    
357     void write_pickup_meta(FILE *fp, int gcmIter, int pickup)
358     {
359     int i;
360     int ndims = 2;
361     int nrecords,nfields;
362     char**f;
363     char*fld1[] = {"Uvel","Vvel","Theta","Salt","GuNm1","GvNm1","EtaN","dEtaHdt","EtaH"};
364     char*fld2[] = {"siTICES","siAREA","siHEFF","siHSNOW","siHSALT","siUICE","siVICE"};
365     // for now, just list the fields here. When the whole field specification apparatus
366     // is cleaned up, pull the names out of the epochstyle definition or whatever
367    
368     ASSERT(1==pickup||2==pickup);
369    
370     if (1==pickup){
371     nrecords = 6*NUM_Z+3;
372     nfields = sizeof(fld1)/sizeof(char*);
373     f = fld1;
374     }
375     else if (2==pickup){
376     nrecords = MULTDIM+6;
377     nfields = sizeof(fld2)/sizeof(char*);
378     f = fld2;
379     }
380    
381     fprintf(fp," nDims = [ %3d ];\n",ndims);
382     fprintf(fp," dimList = [\n");
383     fprintf(fp," %10u,%10d,%10u,\n",NUM_X,1,NUM_X);
384     fprintf(fp," %10ld,%10d,%10ld\n",NUM_Y,1,NUM_Y);
385     fprintf(fp," ];\n");
386     fprintf(fp," dataprec = [ 'float64' ];\n");
387     fprintf(fp," nrecords = [ %5d ];\n",nrecords);
388     fprintf(fp," timeStepNumber = [ %10d ];\n",gcmIter);
389     fprintf(fp," timeInterval = [ %19.12E ];\n",0.0); // what should this be?
390     fprintf(fp," nFlds = [ %4d ];\n",nfields);
391     fprintf(fp," fldList = {\n");
392     for (i=0;i<nfields;++i)
393     fprintf(fp," '%-8s'",f[i]);
394     fprintf(fp,"\n };\n");
395     }
396    
397    
398    
399     double *outBuf=NULL;//[NUM_X*NUM_Y*NUM_Z]; // only needs to be myNumZSlabs
400     size_t outBufSize=0;
401    
402    
403     void
404     do_write(int io_epoch, fieldInfoThisEpoch_t *whichField, int firstZ, int numZ, int gcmIter)
405     {
406     if (0==numZ) return; // this is *not* global NUM_Z : change name of parameter to avoid grief!
407    
408     int pickup = whichField->pickup;
409    
410     ///////////////////////////////
411     // swap here, if necessary
412     // int i,j;
413    
414     //i = NUM_X*NUM_Y*numZ;
415     //mds_byteswapr8_(&i,outBuf);
416    
417     // mds_byteswapr8 expects an integer count, which is gonna overflow someday
418     // can't redefine to long without affecting a bunch of other calls
419     // so do a slab at a time here, to delay the inevitable
420     // i = NUM_X*NUM_Y;
421     //for (j=0;j<numZ;++j)
422     // mds_byteswapr8_(&i,&outBuf[i*j]);
423    
424     // gnu builtin evidently honored by intel compilers
425    
426     if (pickup) {
427     uint64_t *alias = (uint64_t*)outBuf;
428     size_t i;
429     for (i=0;i<NUM_X*NUM_Y*numZ;++i)
430     alias[i] = __builtin_bswap64(alias[i]);
431     }
432     else {
433     uint32_t *alias = (uint32_t*)outBuf;
434     size_t i;
435     for (i=0;i<NUM_X*NUM_Y*numZ;++i)
436     alias[i] = __builtin_bswap32(alias[i]);
437     }
438    
439     // end of swappiness
440     //////////////////////////////////
441    
442     char s[1024];
443     //sprintf(s,"henze_%d_%d_%c.dat",io_epoch,gcmIter,whichField->dataFieldID);
444    
445     sprintf(s,whichField->filenameTemplate,gcmIter,"data");
446    
447     int fd = open(s,O_CREAT|O_WRONLY,S_IRWXU|S_IRGRP);
448     ASSERT(fd!=-1);
449    
450     size_t nbytes;
451    
452     if (pickup) {
453     lseek(fd,whichField->offset,SEEK_SET);
454     lseek(fd,firstZ*NUM_X*NUM_Y*datumSize,SEEK_CUR);
455     nbytes = NUM_X*NUM_Y*numZ*datumSize;
456     }
457     else {
458     lseek(fd,firstZ*NUM_X*NUM_Y*sizeof(float),SEEK_CUR);
459     nbytes = NUM_X*NUM_Y*numZ*sizeof(float);
460     }
461    
462     ssize_t bwrit = writen(fd,outBuf,nbytes);
463    
464     if (-1==bwrit) perror("Henze : file write problem : ");
465    
466     FPRINTF(stderr,"Wrote %d of %d bytes (%d -> %d) \n",bwrit,nbytes,firstZ,numZ);
467    
468     // ASSERT(nbytes == bwrit);
469    
470     if (nbytes!=bwrit)
471     fprintf(stderr,"WROTE %ld /%ld\n",bwrit,nbytes);
472    
473     close(fd);
474    
475    
476     return;
477     }
478    
479    
480     int NTILES = -1;
481    
482     typedef struct {
483     int off;
484     int skip;
485     } tile_layout_t;
486    
487     tile_layout_t *offsetTable;
488    
489     void
490     processSlabSection(
491     fieldInfoThisEpoch_t *whichField,
492     int tileID,
493     void *data,
494     int myNumZSlabs)
495     {
496     int intracommSize,intracommRank;
497     // MPI_Comm_size(whichField->ioRanksIntracomm, &intracommSize);
498     MPI_Comm_rank(whichField->ioRanksIntracomm, &intracommRank);
499     //printf("i/o rank %d/%d recv'd %d::%d (%d->%d)\n",intracommRank,intracommSize,whichField,tileID,firstZ,lastZ);
500     // printf("rank %d : tile %d is gonna go at %d and stride with %d, z = %d\n",intracommRank,tileID,offsetTable[tileID].off,
501     // offsetTable[tileID].skip,myNumZSlabs);
502    
503     int z;
504    
505     int pickup = whichField->pickup;
506    
507     //ASSERT((tileID > 0) && (tileID < (sizeof(offsetTable)/sizeof(tile_layout_t))));
508     ASSERT( (tileID > 0) && (tileID <= NTILES) ); // offsetTable now dynamically allocated
509    
510     if (myNumZSlabs * twoDFieldSizeInBytes > outBufSize){
511    
512     free(outBuf);
513    
514     outBufSize = myNumZSlabs * twoDFieldSizeInBytes;
515    
516     outBuf = malloc(outBufSize);
517     ASSERT(outBuf);
518    
519     memset(outBuf,0,outBufSize);
520     }
521    
522     for (z=0;z<myNumZSlabs;++z){
523    
524     off_t zoffset = z*TILE_X*TILE_Y*NTILES; //NOT totalNumTiles;
525     off_t hoffset = offsetTable[tileID].off;
526     off_t skipdst = offsetTable[tileID].skip;
527    
528     //double *dst = outBuf + zoffset + hoffset;
529     void *dst;
530     if (pickup)
531     dst = outBuf + zoffset + hoffset;
532     else
533     dst = (float*)outBuf + zoffset + hoffset;
534    
535     off_t zoff = z*(TILE_X+2*XGHOSTS)*(TILE_Y+2*YGHOSTS);
536     off_t hoff = YGHOSTS*(TILE_X+2*XGHOSTS) + YGHOSTS;
537     double *src = (double*)data + zoff + hoff;
538    
539     off_t skipsrc = TILE_X+2*XGHOSTS;
540    
541     //fprintf(stderr,"rank %d tile %d offset %d skip %d dst %x src %x\n",intracommRank,tileID,hoffset,skipdst,dst,src);
542    
543     long long n = TILE_X;
544    
545     int y;
546     if (pickup)
547     for (y=0;y<TILE_Y;++y)
548     memcpy((double*)dst + y * skipdst, src + y * skipsrc, TILE_X*datumSize);
549     else
550     for (y=0;y<TILE_Y;++y)
551     memcpy_r8_2_r4((float*)dst + y * skipdst, src + y * skipsrc, &n);
552     }
553    
554     return;
555     }
556    
557    
558    
559     // Allocate some buffers to receive tile-data from the compute ranks
560     void
561     allocateTileBufs(int numTileBufs, int maxIntracommSize)
562     {
563    
564     ASSERT(tileOneZLevelSizeInBytes>0); // be sure we have rec'vd values by now
565    
566     int i, j;
567     for (i = 0; i < numTileBufs; ++i) {
568    
569     bufHdr_t *newBuf = malloc(sizeof(bufHdr_t));
570     ASSERT(NULL != newBuf);
571    
572     newBuf->payload = malloc(tileOneZLevelSizeInBytes * NUM_Z);
573     ASSERT(NULL != newBuf->payload);
574    
575     newBuf->requests = malloc(maxIntracommSize * sizeof(MPI_Request));
576     ASSERT(NULL != newBuf->requests);
577    
578     // Init some values
579     newBuf->requestsArraySize = maxIntracommSize;
580     for (j = 0; j < maxIntracommSize; ++j) {
581     newBuf->requests[j] = MPI_REQUEST_NULL;
582     }
583     newBuf->ck0 = newBuf->ck1 = 0xdeadbeefdeadbeef;
584    
585     // Put the buf into the free list
586     newBuf->state = bufState_Free;
587     newBuf->next = freeTileBufs_ptr;
588     freeTileBufs_ptr = newBuf;
589     }
590     }
591    
592    
593    
594     bufHdr_t *
595     getFreeBuf()
596     {
597     int j;
598     bufHdr_t *rtnValue = freeTileBufs_ptr;
599    
600     if (NULL != rtnValue) {
601     ASSERT(bufState_Free == rtnValue->state);
602     freeTileBufs_ptr = rtnValue->next;
603     rtnValue->next = NULL;
604    
605     // Paranoia. This should already be the case
606     for (j = 0; j < rtnValue->requestsArraySize; ++j) {
607     rtnValue->requests[j] = MPI_REQUEST_NULL;
608     }
609     }
610     return rtnValue;
611     }
612    
613    
614     /////////////////////////////////////////////////////////////////
615    
616    
617     bufHdr_t *
618     tryToReceiveDataTile(
619     MPI_Comm dataIntercomm,
620     int epochID,
621     size_t expectedMsgSize,
622     int *tileID)
623     {
624     bufHdr_t *bufHdr;
625     int pending, i, count;
626     MPI_Status mpiStatus;
627    
628     MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, dataIntercomm,
629     &pending, &mpiStatus);
630    
631     // If no data are pending, or if we can't get a buffer to
632     // put the pending data into, return NULL
633     if (!pending) return NULL;
634     if (((bufHdr = getFreeBuf()) == NULL)) {
635     FPRINTF(stderr,"tile %d(%d) pending, but no buf to recv it\n",
636     mpiStatus.MPI_TAG, mpiStatus.MPI_TAG >> numCheckBits);
637     return NULL;
638     }
639    
640     // Do sanity checks on the pending message
641     MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
642     ASSERT(count == expectedMsgSize);
643     ASSERT((mpiStatus.MPI_TAG & bitMask) == (epochID & bitMask));
644    
645     // Recieve the data we saw in the iprobe
646     MPI_Recv(bufHdr->payload, expectedMsgSize, MPI_BYTE,
647     mpiStatus.MPI_SOURCE, mpiStatus.MPI_TAG,
648     dataIntercomm, &mpiStatus);
649     bufHdr->state = bufState_InUse;
650    
651     // Overrun check
652     ASSERT(bufHdr->ck0==0xdeadbeefdeadbeef);
653     ASSERT(bufHdr->ck0==bufHdr->ck1);
654    
655     // Return values
656     *tileID = mpiStatus.MPI_TAG >> numCheckBits;
657    
658    
659     FPRINTF(stderr, "recv tile %d(%d) from compute rank %d\n",
660     mpiStatus.MPI_TAG, *tileID, mpiStatus.MPI_SOURCE);
661    
662     return bufHdr;
663     }
664    
665    
666    
667     int
668     tryToReceiveZSlab(
669     void *buf,
670     int expectedMsgSize,
671     MPI_Comm intracomm)
672     {
673     MPI_Status mpiStatus;
674     int pending, count;
675    
676     MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, intracomm, &pending, &mpiStatus);
677     if (!pending) return -1;
678    
679     MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
680     ASSERT(count == expectedMsgSize);
681    
682     MPI_Recv(buf, count, MPI_BYTE, mpiStatus.MPI_SOURCE,
683     mpiStatus.MPI_TAG, intracomm, &mpiStatus);
684    
685     FPRINTF(stderr, "recv slab %d from rank %d\n",
686     mpiStatus.MPI_TAG, mpiStatus.MPI_SOURCE);
687    
688     // return the tileID
689     return mpiStatus.MPI_TAG;
690     }
691    
692    
693    
694     void
695     redistributeZSlabs(
696     bufHdr_t *bufHdr,
697     int tileID,
698     int zSlabsPer,
699     int thisFieldNumZ,
700     MPI_Comm intracomm)
701     {
702     int pieceSize = zSlabsPer * tileOneZLevelSizeInBytes;
703     int tileSizeInBytes = tileOneZLevelSizeInBytes * thisFieldNumZ;
704     int offset = 0;
705     int recvRank = 0;
706    
707     while ((offset + pieceSize) <= tileSizeInBytes) {
708    
709     MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
710     tileID, intracomm, &(bufHdr->requests[recvRank]));
711     ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
712    
713     offset += pieceSize;
714     recvRank += 1;
715     }
716    
717     // There might be one last odd-sized piece
718     if (offset < tileSizeInBytes) {
719     pieceSize = tileSizeInBytes - offset;
720     ASSERT(pieceSize % tileOneZLevelSizeInBytes == 0);
721    
722     MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
723     tileID, intracomm, &(bufHdr->requests[recvRank]));
724     ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
725    
726     offset += pieceSize;
727     recvRank += 1;
728     }
729    
730     // Sanity check
731     ASSERT(recvRank <= bufHdr->requestsArraySize);
732     while (recvRank < bufHdr->requestsArraySize) {
733     ASSERT(MPI_REQUEST_NULL == bufHdr->requests[recvRank++])
734     }
735     }
736    
737    
738    
739     /////////////////////////////////////////////////////////////////
740     // This is called by the i/o ranks
741     void
742     doNewEpoch(int epochID, int epochStyleIndex, int gcmIter)
743     {
744     // In an i/o epoch, the i/o ranks are partitioned into groups,
745     // each group dealing with one field. The ranks within a group:
746     // (1) receive data tiles from the compute ranks
747     // (2) slice the received data tiles into slabs in the 'z'
748     // dimension, and redistribute the tile-slabs among the group.
749     // (3) receive redistributed tile-slabs, and reconstruct a complete
750     // field-slab for the whole field.
751     // (4) Write the completed field-slab to disk.
752    
753     fieldInfoThisEpoch_t *fieldInfo;
754     int intracommRank, intracommSize;
755    
756     int zSlabsPer;
757     int myNumZSlabs, myFirstZSlab, myLastZSlab, myNumSlabPiecesToRecv;
758    
759     int numTilesRecvd, numSlabPiecesRecvd;
760    
761    
762     // Find the descriptor for my assigned field for this epoch style.
763     // It is the one whose dataIntercomm is not null
764     fieldInfo = epochStyles[epochStyleIndex];
765     while (MPI_COMM_NULL == fieldInfo->dataIntercomm) ++fieldInfo;
766     ASSERT('\0' != fieldInfo->dataFieldID);
767    
768    
769    
770     MPI_Comm_rank(fieldInfo->ioRanksIntracomm, &intracommRank);
771     MPI_Comm_size(fieldInfo->ioRanksIntracomm, &intracommSize);
772    
773    
774     // Compute which z slab(s) we will be reassembling.
775     zSlabsPer = divCeil(fieldInfo->zDepth, intracommSize);
776     myNumZSlabs = zSlabsPer;
777    
778     // Adjust myNumZSlabs in case it didn't divide evenly
779     myFirstZSlab = intracommRank * myNumZSlabs;
780     if (myFirstZSlab >= fieldInfo->zDepth) {
781     myNumZSlabs = 0;
782     } else if ((myFirstZSlab + myNumZSlabs) > fieldInfo->zDepth) {
783     myNumZSlabs = fieldInfo->zDepth - myFirstZSlab;
784     } else {
785     myNumZSlabs = zSlabsPer;
786     }
787     myLastZSlab = myFirstZSlab + myNumZSlabs - 1;
788    
789     // If we were not assigned any z-slabs, we don't get any redistributed
790     // tile-slabs. If we were assigned one or more slabs, we get
791     // redistributed tile-slabs for every tile.
792     myNumSlabPiecesToRecv = (0 == myNumZSlabs) ? 0 : totalNumTiles;
793    
794    
795     numTilesRecvd = 0;
796     numSlabPiecesRecvd = 0;
797    
798     ////////////////////////////////////////////////////////////////////
799     // Main loop. Handle tiles from the current epoch
800     for(;;) {
801     bufHdr_t *bufHdr;
802    
803     ////////////////////////////////////////////////////////////////
804     // Check for tiles from the computational tasks
805     while (numTilesRecvd < fieldInfo->tileCount) {
806     int tileID = -1;
807     size_t msgSize = tileOneZLevelSizeInBytes * fieldInfo->zDepth;
808     bufHdr = tryToReceiveDataTile(fieldInfo->dataIntercomm, epochID,
809     msgSize, &tileID);
810     if (NULL == bufHdr) break; // No tile was received
811    
812     numTilesRecvd += 1;
813     redistributeZSlabs(bufHdr, tileID, zSlabsPer,
814     fieldInfo->zDepth, fieldInfo->ioRanksIntracomm);
815    
816     // Add the bufHdr to the "in use" list
817     bufHdr->next = inUseTileBufs_ptr;
818     inUseTileBufs_ptr = bufHdr;
819    
820    
821     }
822    
823    
824     ////////////////////////////////////////////////////////////////
825     // Check for tile-slabs redistributed from the i/o processes
826     while (numSlabPiecesRecvd < myNumSlabPiecesToRecv) {
827     int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs;
828     char data[msgSize];
829    
830     int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm);
831     if (tileID < 0) break; // No slab was received
832    
833    
834     numSlabPiecesRecvd += 1;
835     processSlabSection(fieldInfo, tileID, data, myNumZSlabs);
836    
837    
838     // Can do the write here, or at the end of the epoch.
839     // Probably want to make it asynchronous (waiting for
840     // completion at the barrier at the start of the next epoch).
841     //if (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) {
842     // ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
843     // do_write(io_epoch,whichField,myFirstZSlab,myNumZSlabs);
844     //}
845     }
846    
847    
848     ////////////////////////////////////////////////////////////////
849     // Check if we can release any of the inUse buffers (i.e. the
850     // isends we used to redistribute those z-slabs have completed).
851     // We do this by detaching the inUse list, then examining each
852     // element in the list, putting each element either into the
853     // free list or back into the inUse list, as appropriate.
854     bufHdr = inUseTileBufs_ptr;
855     inUseTileBufs_ptr = NULL;
856     while (NULL != bufHdr) {
857     int count = 0;
858     int completions[intracommSize];
859     MPI_Status statuses[intracommSize];
860    
861     // Acquire the "next" bufHdr now, before we change anything.
862     bufHdr_t *nextBufHeader = bufHdr->next;
863    
864     // Check to see if the Isend requests have completed for this buf
865     MPI_Testsome(intracommSize, bufHdr->requests,
866     &count, completions, statuses);
867    
868     if (MPI_UNDEFINED == count) {
869     // A return of UNDEFINED means that none of the requests
870     // is still outstanding, i.e. they have all completed.
871     // Put the buf on the free list.
872     bufHdr->state = bufState_Free;
873     bufHdr->next = freeTileBufs_ptr;
874     freeTileBufs_ptr = bufHdr;
875     FPRINTF(stderr,"Rank %d freed a tile buffer\n", intracommRank);
876     } else {
877     // At least one request is still outstanding.
878     // Put the buf on the inUse list.
879     bufHdr->next = inUseTileBufs_ptr;
880     inUseTileBufs_ptr = bufHdr;
881     }
882    
883     // Countinue with the next buffer
884     bufHdr = nextBufHeader;
885     }
886    
887    
888     ////////////////////////////////////////////////////////////////
889     // Check if the epoch is complete
890     if ((numTilesRecvd >= fieldInfo->tileCount) &&
891     (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) &&
892     (NULL == inUseTileBufs_ptr))
893     {
894     ASSERT(numTilesRecvd == fieldInfo->tileCount);
895     ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
896    
897     //fprintf(stderr,"rank %d %d %d %d\n",intracommRank,numTilesRecvd,numSlabPiecesRecvd,myNumZSlabs);
898    
899     // Ok, wrap up the current i/o epoch
900     // Probably want to make this asynchronous (waiting for
901     // completion at the start of the next epoch).
902     do_write(epochID, fieldInfo, myFirstZSlab, myNumZSlabs, gcmIter);
903    
904     // The epoch is complete
905     return;
906     }
907     }
908    
909     }
910    
911    
912    
913    
914    
915    
916    
917     // Main routine for the i/o tasks. Callers DO NOT RETURN from
918     // this routine; they MPI_Finalize and exit.
919     void
920     ioRankMain(int totalNumTiles)
921     {
922     // This is one of a group of i/o processes that will be receiving tiles
923     // from the computational processes. This same process is also
924     // responsible for compositing slices of tiles together into one or
925     // more complete slabs, and then writing those slabs out to disk.
926     //
927     // When a tile is received, it is cut up into slices, and each slice is
928     // sent to the appropriate compositing task that is dealing with those
929     // "z" level(s) (possibly including ourselves). These are tagged with
930     // the tile-id. When we *receive* such a message (from ourselves, or
931     // from any other process in this group), we unpack the data (stripping
932     // the ghost cells), and put them into the slab we are assembling. Once
933     // a slab is fully assembled, it is written to disk.
934    
935     int i, size, count, numTileBufs;
936     MPI_Status status;
937     int currentEpochID = 0;
938     int maxTileCount = 0, max3DTileCount = 0, maxIntracommSize = 0;
939    
940     int numComputeRanks, epochStyleIndex;
941     MPI_Comm_remote_size(globalIntercomm, &numComputeRanks);
942    
943    
944     ////////////////////////////////////////////////////////////
945     //// get global tile info via bcast over the global intercom
946    
947    
948     // int ierr = MPI_Bcast(&NTILES,1,MPI_INT,0,globalIntercomm);
949    
950     int data[9];
951    
952     int ierr = MPI_Bcast(data,9,MPI_INT,0,globalIntercomm);
953    
954     NTILES = data[3];
955     TILE_X = data[5];
956     TILE_Y = data[6];
957     XGHOSTS = data[7];
958     YGHOSTS = data[8];
959    
960     tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
961     tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // also calculated in compute ranks, urghh
962    
963     int *xoff = malloc(NTILES*sizeof(int));
964     ASSERT(xoff);
965     int *yoff = malloc(NTILES*sizeof(int));
966     ASSERT(yoff);
967     int *xskip = malloc(NTILES*sizeof(int));
968     ASSERT(xskip);
969    
970     offsetTable = malloc((NTILES+1)*sizeof(tile_layout_t));
971     ASSERT(offsetTable);
972    
973     ierr = MPI_Bcast(xoff,NTILES,MPI_INT,0,globalIntercomm);
974     ierr = MPI_Bcast(yoff,NTILES,MPI_INT,0,globalIntercomm);
975     ierr = MPI_Bcast(xskip,NTILES,MPI_INT,0,globalIntercomm);
976    
977     for (i=0;i<NTILES;++i){
978     offsetTable[i+1].off = NUM_X*(yoff[i]-1) + xoff[i] - 1;
979     offsetTable[i+1].skip = xskip[i];
980     }
981    
982     free(xoff);
983     free(yoff);
984     free(xskip);
985    
986     ///////////////////////////////////////////////////////////////
987    
988    
989     // At this point, the multitude of various inter-communicators have
990     // been created and stored in the various epoch "style" arrays.
991     // The next step is to have the computational tasks "register" the
992     // data tiles they will send. In this version of the code, we only
993     // track and store *counts*, not the actual tileIDs.
994     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
995     fieldInfoThisEpoch_t *p = NULL;
996     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
997     int thisIntracommSize;
998    
999     // Find the field we were assigned for this epoch style
1000     int curFieldIndex = -1;
1001     while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1002     p = &(thisEpochStyle[curFieldIndex]);
1003     if (MPI_COMM_NULL != p->registrationIntercomm) break;
1004     }
1005     ASSERT(NULL != p);
1006     ASSERT('\0' != p->dataFieldID);
1007     MPI_Comm_size(p->ioRanksIntracomm, &size);
1008     if (size > maxIntracommSize) maxIntracommSize = size;
1009    
1010    
1011     // Receive a message from *each* computational rank telling us
1012     // a count of how many tiles that rank will send during epochs
1013     // of this style. Note that some of these counts may be zero
1014     for (i = 0; i < numComputeRanks; ++i) {
1015     MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG,
1016     p->registrationIntercomm, &status);
1017     p->tileCount += status.MPI_TAG;
1018     }
1019     if (p->tileCount > maxTileCount) maxTileCount = p->tileCount;
1020     if (p->zDepth > 1) {
1021     if (p->tileCount > max3DTileCount) max3DTileCount = p->tileCount;
1022     }
1023    
1024     // Sanity check
1025     MPI_Allreduce (&(p->tileCount), &count, 1,
1026     MPI_INT, MPI_SUM, p->ioRanksIntracomm);
1027     ASSERT(count == totalNumTiles);
1028     }
1029    
1030    
1031     // In some as-of-yet-undetermined fashion, we decide how many
1032     // buffers to allocate to hold tiles received from the computational
1033     // tasks. The number of such buffers is more-or-less arbitrary (the
1034     // code should be able to function with any number greater than zero).
1035     // More buffers means more parallelism, but uses more space.
1036     // Note that maxTileCount is an upper bound on the number of buffers
1037     // that we can usefully use. Note also that if we are assigned a 2D
1038     // field, we might be assigned to recieve a very large number of tiles
1039     // for that field in that epoch style.
1040    
1041     // Hack - for now, just pick a value for numTileBufs
1042     numTileBufs = (max3DTileCount > 0) ? max3DTileCount : maxTileCount;
1043     if (numTileBufs < 4) numTileBufs = 4;
1044     if (numTileBufs > 15) numTileBufs = 15;
1045     // if (numTileBufs > 25) numTileBufs = 25;
1046    
1047     allocateTileBufs(numTileBufs, maxIntracommSize);
1048     countBufs(numTileBufs);
1049    
1050    
1051     ////////////////////////////////////////////////////////////////////
1052     // Main loop.
1053     ////////////////////////////////////////////////////////////////////
1054    
1055     for (;;) {
1056     int cmd[4];
1057    
1058     // Make sure all the i/o ranks have completed processing the prior
1059     // cmd (i.e. the prior i/o epoch is complete).
1060     MPI_Barrier(ioIntracomm);
1061    
1062     if (0 == ioIntracommRank) {
1063     fprintf(stderr, "I/O ranks waiting for new epoch\n");
1064     MPI_Send(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, globalIntercomm);
1065    
1066     MPI_Recv(cmd, 4, MPI_INT, 0, 0, globalIntercomm, MPI_STATUS_IGNORE);
1067     fprintf(stderr, "I/O ranks begining new epoch: %d, gcmIter = %d\n", cmd[1],cmd[3]);
1068    
1069     // before we start a new epoch, have i/o rank 0:
1070     // determine output filenames for this epoch
1071     // clean up any extant files with same names
1072     // write .meta files
1073    
1074     if (cmd_exit != cmd[0]){
1075    
1076     fprintf(stderr,"new epoch: epoch %d, style %d, gcmIter %d\n", cmd[1],cmd[2],cmd[3]);
1077    
1078     int epochStyle = cmd[2];
1079     int gcmIter = cmd[3];
1080    
1081     fieldInfoThisEpoch_t *fieldInfo;
1082     fieldInfo = epochStyles[epochStyle];
1083     char s[1024];
1084     int res;
1085     FILE *fp;
1086    
1087     if (fieldInfo->pickup==0){ // for non-pickups, need to loop over individual fields
1088     char f;
1089     while (f = fieldInfo->dataFieldID){
1090     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1091     fprintf(stderr,"%s\n",s);
1092     res = unlink(s);
1093     if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1094    
1095     // skip writing meta files for non-pickup fields
1096     /*
1097     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1098     fp = fopen(s,"w+");
1099     fclose(fp);
1100     */
1101    
1102     ++fieldInfo;
1103    
1104     }
1105     }
1106    
1107     else { // single pickup or pickup_seaice file
1108    
1109     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1110     fprintf(stderr,"%s\n",s);
1111     res = unlink(s);
1112     if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1113    
1114     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1115     fp = fopen(s,"w+");
1116     write_pickup_meta(fp, gcmIter, fieldInfo->pickup);
1117     fclose(fp);
1118    
1119     }
1120     }
1121     }
1122     MPI_Bcast(cmd, 4, MPI_INT, 0, ioIntracomm);
1123    
1124     switch (cmd[0]) {
1125    
1126     case cmd_exit:
1127     // Don't bother trying to disconnect and free the
1128     // plethora of communicators; just exit.
1129     FPRINTF(stderr,"Received shutdown message\n");
1130     MPI_Finalize();
1131     exit(0);
1132     break;
1133    
1134     case cmd_newEpoch:
1135     if ((currentEpochID + 1) != cmd[1]) {
1136     fprintf(stderr, "ERROR: Missing i/o epoch? was %d, "
1137     "but is now %d ??\n", currentEpochID, cmd[1]);
1138     sleep(1); // Give the message a chance to propagate
1139     abort();
1140     }
1141     currentEpochID = cmd[1];
1142    
1143     memset(outBuf,0,outBufSize); // zero the outBuf, so dry tiles are well defined
1144    
1145     doNewEpoch(cmd[1], cmd[2], cmd[3]);
1146     break;
1147    
1148     default:
1149     fprintf(stderr, "Unexpected epoch command: %d %d %d %d\n",
1150     cmd[0], cmd[1], cmd[2], cmd[3]);
1151     sleep(1);
1152     abort();
1153     break;
1154     }
1155    
1156     }
1157    
1158     }
1159    
1160    
1161    
1162     ///////////////////////////////////////////////////////////////////////////////////
1163    
1164     int
1165     findField(const char c)
1166     {
1167     int i;
1168     for (i = 0; i < numAllFields; ++i) {
1169     if (c == fieldDepths[i].dataFieldID) return i;
1170     }
1171    
1172     // Give the error message a chance to propagate before exiting.
1173     fprintf(stderr, "ERROR: Field not found: '%c'\n", c);
1174     sleep(1);
1175     abort();
1176     }
1177    
1178    
1179    
1180     // Given a number of ranks and a set of fields we will want to output,
1181     // figure out how to distribute the ranks among the fields.
1182     void
1183     computeIORankAssigments(
1184     int numComputeRanks,
1185     int numIORanks,
1186     int numFields,
1187     fieldInfoThisEpoch_t *fields,
1188     int assignments[])
1189     {
1190     int i,j,k, sum, count;
1191    
1192     int numIONodes = numIORanks / numRanksPerNode;
1193     int numIORanksThisField[numFields];
1194     long int bytesPerIORankThisField[numFields];
1195     int expectedMessagesPerRankThisField[numFields];
1196     long int fieldSizes[numFields];
1197    
1198     struct ioNodeInfo_t {
1199     int expectedNumMessagesThisNode;
1200     int numCoresAssigned;
1201     int *assignedFieldThisCore;
1202     } ioNodeInfo[numIONodes];
1203    
1204     // Since numRanksPerNode might be dynamically determined, we have
1205     // to dynamically allocate the assignedFieldThisCore arrays.
1206     for (i = 0; i < numIONodes; ++i) {
1207     ioNodeInfo[i].assignedFieldThisCore = malloc(sizeof(int)*numRanksPerNode);
1208     ASSERT(NULL != ioNodeInfo[i].assignedFieldThisCore);
1209     }
1210    
1211     ASSERT((numIONodes*numRanksPerNode) == numIORanks);
1212    
1213    
1214     //////////////////////////////////////////////////////////////
1215     // Compute the size for each field in this epoch style
1216     for (i = 0; i < numFields; ++i) {
1217     ASSERT((1 == fields[i].zDepth) || (NUM_Z == fields[i].zDepth) || (MULTDIM == fields[i].zDepth));
1218     fieldSizes[i] = twoDFieldSizeInBytes * fields[i].zDepth;
1219     }
1220    
1221    
1222     /////////////////////////////////////////////////////////
1223     // Distribute the available i/o ranks among the fields,
1224     // proportionally based on field size.
1225    
1226     // First, assign one rank per field
1227     ASSERT(numIORanks >= numFields);
1228     for (i = 0; i < numFields; ++i) {
1229     numIORanksThisField[i] = 1;
1230     bytesPerIORankThisField[i] = fieldSizes[i];
1231     }
1232    
1233     // Now apportion any extra ranks
1234     for (; i < numIORanks; ++i) {
1235    
1236     // Find the field 'k' with the most bytesPerIORank
1237     k = 0;
1238     for (j = 1; j < numFields; ++j) {
1239     if (bytesPerIORankThisField[j] > bytesPerIORankThisField[k]) {
1240     k = j;
1241     }
1242     }
1243    
1244     // Assign an i/o rank to that field
1245     numIORanksThisField[k] += 1;
1246     bytesPerIORankThisField[k] = fieldSizes[k] / numIORanksThisField[k];
1247     }
1248    
1249     ////////////////////////////////////////////////////////////
1250     // At this point, all the i/o ranks should be apportioned
1251     // among the fields. Check we didn't screw up the count.
1252     for (sum = 0, i = 0; i < numFields; ++i) {
1253     sum += numIORanksThisField[i];
1254     }
1255     ASSERT(sum == numIORanks);
1256    
1257    
1258    
1259     //////////////////////////////////////////////////////////////////
1260     // The *number* of i/o ranks assigned to a field is based on the
1261     // field size. But the *placement* of those ranks is based on
1262     // the expected number of messages the process will receive.
1263     // [In particular, if we have several fields each with only one
1264     // assigned i/o rank, we do not want to place them all on the
1265     // same node if we don't have to.] This traffic-load balance
1266     // strategy is an approximation at best since the messages may
1267     // not all be the same size (e.g. 2D fields vs. 3D fields), so
1268     // "number of messages" is not the same thing as "traffic".
1269     // But it should suffice.
1270    
1271     // Init a couple of things
1272     for (i = 0; i < numFields; ++i) {
1273     expectedMessagesPerRankThisField[i] =
1274     numComputeRanks / numIORanksThisField[i];
1275     }
1276     for (i = 0; i < numIONodes; ++i) {
1277     ioNodeInfo[i].expectedNumMessagesThisNode = 0;
1278     ioNodeInfo[i].numCoresAssigned = 0;
1279     for (j = 0; j < numRanksPerNode; ++j) {
1280     ioNodeInfo[i].assignedFieldThisCore[j] = -1;
1281     }
1282     }
1283    
1284     ///////////////////////////////////////////////////////////////////
1285     // Select the i/o node with the smallest expectedNumMessages, and
1286     // assign it a rank from the field with the highest remaining
1287     // expectedMessagesPerRank. Repeat until everything is assigned.
1288     // (Yes, this could be a lot faster, but isn't worth the trouble.)
1289    
1290     for (count = 0; count < numIORanks; ++count) {
1291    
1292     // Make an initial choice of node
1293     for (i = 0; i < numIONodes; ++i) {
1294     if (ioNodeInfo[i].numCoresAssigned < numRanksPerNode) break;
1295     }
1296     j = i;
1297     ASSERT(j < numIONodes);
1298    
1299     // Search for a better choice
1300     for (++i; i < numIONodes; ++i) {
1301     if (ioNodeInfo[i].numCoresAssigned >= numRanksPerNode) continue;
1302     if (ioNodeInfo[i].expectedNumMessagesThisNode <
1303     ioNodeInfo[j].expectedNumMessagesThisNode)
1304     {
1305     j = i;
1306     }
1307     }
1308    
1309    
1310     // Make an initial choice of field
1311     for (i = 0; i < numFields; ++i) {
1312     if (numIORanksThisField[i] > 0) break;
1313     }
1314     k = i;
1315     ASSERT(k < numFields);
1316    
1317     // Search for a better choice
1318     for (++i; i < numFields; ++i) {
1319     if (numIORanksThisField[i] <= 0) continue;
1320     if (expectedMessagesPerRankThisField[i] >
1321     expectedMessagesPerRankThisField[k])
1322     {
1323     k = i;
1324     }
1325     }
1326    
1327     // Put one rank from the chosen field onto the chosen node
1328     ioNodeInfo[j].expectedNumMessagesThisNode += expectedMessagesPerRankThisField[k];
1329     ioNodeInfo[j].assignedFieldThisCore[ioNodeInfo[j].numCoresAssigned] = k;
1330     ioNodeInfo[j].numCoresAssigned += 1;
1331     numIORanksThisField[k] -= 1;
1332     }
1333    
1334     // Sanity check - all ranks were assigned to a core
1335     for (i = 1; i < numFields; ++i) {
1336     ASSERT(0 == numIORanksThisField[i]);
1337     }
1338     // Sanity check - all cores were assigned a rank
1339     for (i = 1; i < numIONodes; ++i) {
1340     ASSERT(numRanksPerNode == ioNodeInfo[i].numCoresAssigned);
1341     }
1342    
1343    
1344     /////////////////////////////////////
1345     // Return the computed assignments
1346     for (i = 0; i < numIONodes; ++i) {
1347     for (j = 0; j < numRanksPerNode; ++j) {
1348     assignments[i*numRanksPerNode + j] =
1349     ioNodeInfo[i].assignedFieldThisCore[j];
1350     }
1351     }
1352    
1353     // Clean up
1354     for (i = 0; i < numIONodes; ++i) {
1355     free(ioNodeInfo[i].assignedFieldThisCore);
1356     }
1357    
1358     }
1359    
1360     //////////////////////////////////////////////////////////////////////////////////
1361    
1362    
1363    
1364    
1365     int
1366     isIORank(int commRank, int totalNumNodes, int numIONodes)
1367     {
1368     // Figure out if this rank is on a node that will do i/o.
1369     // Note that the i/o nodes are distributed throughout the
1370     // task, not clustered together.
1371     int ioNodeStride = totalNumNodes / numIONodes;
1372     int thisRankNode = commRank / numRanksPerNode;
1373     int n = thisRankNode / ioNodeStride;
1374     return (((n * ioNodeStride) == thisRankNode) && (n < numIONodes)) ? 1 : 0;
1375     }
1376    
1377    
1378     // Find the number of *physical cores* on the current node
1379     // (ignore hyperthreading). This should work for pretty much
1380     // any Linux based system (and fail horribly for anything else).
1381     int
1382     getNumCores(void)
1383     {
1384     return 20; // until we figure out why this cratered
1385    
1386     char *magic = "cat /proc/cpuinfo | egrep \"core id|physical id\" | "
1387     "tr -d \"\\n\" | sed s/physical/\\\\nphysical/g | "
1388     "grep -v ^$ | sort -u | wc -l";
1389    
1390     FILE *fp = popen(magic,"r");
1391     ASSERT(fp);
1392    
1393     int rtnValue = -1;
1394    
1395     int res = fscanf(fp,"%d",&rtnValue);
1396    
1397     ASSERT(1==res);
1398    
1399     pclose(fp);
1400    
1401     ASSERT(rtnValue > 0);
1402     return rtnValue;
1403     }
1404    
1405    
1406    
1407     ////////////////////////////////////////////////////////////////////////
1408     ////////////////////////////////////////////////////////////////////////
1409     // Routines called by the mitGCM code
1410    
1411    
1412     ////////////////////////////////////////
1413     // Various "one time" initializations
1414     void
1415     f1(
1416     MPI_Comm parentComm,
1417     int numComputeRanks,
1418     int numTiles,
1419     MPI_Comm *rtnComputeComm)
1420     {
1421     MPI_Comm newIntracomm, newIntercomm, dupParentComm;
1422     int newIntracommRank, thisIsIORank;
1423     int parentSize, parentRank;
1424     int numIONodes, numIORanks;
1425     int mpiIsInitialized, *intPtr, tagUBexists;
1426     int i, j, nF, compRoot, fieldIndex, epochStyleIndex;
1427     int totalNumNodes, numComputeNodes, newIntracommSize;
1428    
1429     // Init globals
1430     totalNumTiles = numTiles;
1431     if (numRanksPerNode <= 0) {
1432     // Might also want to check for an env var (or something)
1433     numRanksPerNode = getNumCores();
1434     }
1435    
1436     // Fill in the zDepth field of the fieldInfoThisEpoch_t descriptors
1437     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1438     fieldInfoThisEpoch_t *f, *thisEpochStyle = epochStyles[epochStyleIndex];
1439    
1440     int curFieldIndex = -1;
1441     while ((f = &(thisEpochStyle[++curFieldIndex]))->dataFieldID != '\0') {
1442     i = findField(f->dataFieldID);
1443     if (-1 != f->zDepth) {
1444     if (f->zDepth != fieldDepths[i].numZ) {
1445     fprintf(stderr, "Inconsistent z-depth for field '%c': "
1446     "fieldDepths[%d] and epoch style %d\n",
1447     f->dataFieldID, i, epochStyleIndex);
1448     }
1449     }
1450     f->zDepth = fieldDepths[i].numZ;
1451     }
1452     }
1453    
1454    
1455     // Find the max MPI "tag" value
1456     MPI_Attr_get(MPI_COMM_WORLD, MPI_TAG_UB, &intPtr, &tagUBexists);
1457     ASSERT(tagUBexists);
1458     maxTagValue = *intPtr;
1459    
1460    
1461     // Info about the parent communicator
1462     MPI_Initialized(&mpiIsInitialized);
1463     ASSERT(mpiIsInitialized);
1464     MPI_Comm_size(parentComm, &parentSize);
1465     MPI_Comm_rank(parentComm, &parentRank);
1466    
1467    
1468     // Figure out how many nodes we can use for i/o.
1469     // To make things (e.g. memory calculations) simpler, we want
1470     // a node to have either all compute ranks, or all i/o ranks.
1471     // If numComputeRanks does not evenly divide numRanksPerNode, we have
1472     // to round up in favor of the compute side.
1473    
1474     totalNumNodes = divCeil(parentSize, numRanksPerNode);
1475     numComputeNodes = divCeil(numComputeRanks, numRanksPerNode);
1476     numIONodes = (parentSize - numComputeRanks) / numRanksPerNode;
1477     ASSERT(numIONodes > 0);
1478     ASSERT(numIONodes <= (totalNumNodes - numComputeNodes));
1479     numIORanks = numIONodes * numRanksPerNode;
1480    
1481    
1482     // It is surprisingly easy to launch the job with the wrong number
1483     // of ranks, particularly if the number of compute ranks is not a
1484     // multiple of numRanksPerNode (the tendency is to just launch one
1485     // rank per core for all available cores). So we introduce a third
1486     // option for a rank: besides being an i/o rank or a compute rank,
1487     // it might instead be an "excess" rank, in which case it just
1488     // calls MPI_Finalize and exits.
1489    
1490     typedef enum { isCompute, isIO, isExcess } rankType;
1491     rankType thisRanksType;
1492    
1493     if (isIORank(parentRank, totalNumNodes, numIONodes)) {
1494     thisRanksType = isIO;
1495     } else if (parentRank >= numComputeRanks + numIORanks) {
1496     thisRanksType = isExcess;
1497     } else {
1498     thisRanksType = isCompute;
1499     }
1500    
1501     // Split the parent communicator into parts
1502     MPI_Comm_split(parentComm, thisRanksType, parentRank, &newIntracomm);
1503     MPI_Comm_rank(newIntracomm, &newIntracommRank);
1504    
1505     // Excess ranks disappear
1506     // N.B. "parentSize" still includes these ranks.
1507     if (isExcess == thisRanksType) {
1508     MPI_Finalize();
1509     exit(0);
1510     }
1511    
1512     // Sanity check
1513     MPI_Comm_size(newIntracomm, &newIntracommSize);
1514     if (isIO == thisRanksType) {
1515     ASSERT(newIntracommSize == numIORanks);
1516     } else {
1517     ASSERT(newIntracommSize == numComputeRanks);
1518     }
1519    
1520    
1521     // Create a special intercomm from the i/o and compute parts
1522     if (isIO == thisRanksType) {
1523     // Set globals
1524     ioIntracomm = newIntracomm;
1525     MPI_Comm_rank(ioIntracomm, &ioIntracommRank);
1526    
1527     // Find the lowest computation rank
1528     for (i = 0; i < parentSize; ++i) {
1529     if (!isIORank(i, totalNumNodes, numIONodes)) break;
1530     }
1531     } else { // isCompute
1532     // Set globals
1533     computeIntracomm = newIntracomm;
1534     MPI_Comm_rank(computeIntracomm, &computeIntracommRank);
1535    
1536     // Find the lowest IO rank
1537     for (i = 0; i < parentSize; ++i) {
1538     if (isIORank(i, totalNumNodes, numIONodes)) break;
1539     }
1540     }
1541     MPI_Intercomm_create(newIntracomm, 0, parentComm, i, 0, &globalIntercomm);
1542    
1543    
1544    
1545     ///////////////////////////////////////////////////////////////////
1546     // For each different i/o epoch style, split the i/o ranks among
1547     // the fields, and create an inter-communicator for each split.
1548    
1549     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1550     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1551     int fieldAssignments[numIORanks];
1552     int rankAssignments[numComputeRanks + numIORanks];
1553    
1554     // Count the number of fields in this epoch style
1555     for (nF = 0; thisEpochStyle[nF].dataFieldID != '\0'; ++nF) ;
1556    
1557     // Decide how to apportion the i/o ranks among the fields
1558     for (i=0; i < numIORanks; ++i) fieldAssignments[i] = -1;
1559     computeIORankAssigments(numComputeRanks, numIORanks, nF,
1560     thisEpochStyle, fieldAssignments);
1561     // Sanity check
1562     for (i=0; i < numIORanks; ++i) {
1563     ASSERT((fieldAssignments[i] >= 0) && (fieldAssignments[i] < nF));
1564     }
1565    
1566     // Embed the i/o rank assignments into an array holding
1567     // the assignments for *all* the ranks (i/o and compute).
1568     // Rank assignment of "-1" means "compute".
1569     j = 0;
1570     for (i = 0; i < numComputeRanks + numIORanks; ++i) {
1571     rankAssignments[i] = isIORank(i, totalNumNodes, numIONodes) ?
1572     fieldAssignments[j++] : -1;
1573     }
1574     // Sanity check. Check the assignment for this rank.
1575     if (isIO == thisRanksType) {
1576     ASSERT(fieldAssignments[newIntracommRank] == rankAssignments[parentRank]);
1577     } else {
1578     ASSERT(-1 == rankAssignments[parentRank]);
1579     }
1580     ASSERT(j == numIORanks);
1581    
1582     if (0 == parentRank) {
1583     printf("\ncpu assignments, epoch style %d\n", epochStyleIndex);
1584     for (i = 0; i < numComputeNodes + numIONodes ; ++i) {
1585     if (rankAssignments[i*numRanksPerNode] >= 0) {
1586     // i/o node
1587     for (j = 0; j < numRanksPerNode; ++j) {
1588     printf(" %1d", rankAssignments[i*numRanksPerNode + j]);
1589     }
1590     } else {
1591     // compute node
1592     for (j = 0; j < numRanksPerNode; ++j) {
1593     if ((i*numRanksPerNode + j) >= (numComputeRanks + numIORanks)) break;
1594     ASSERT(-1 == rankAssignments[i*numRanksPerNode + j]);
1595     }
1596     printf(" #");
1597     for (; j < numRanksPerNode; ++j) { // "excess" ranks (if any)
1598     printf("X");
1599     }
1600     }
1601     printf(" ");
1602     }
1603     printf("\n\n");
1604     }
1605    
1606     // Find the lowest rank assigned to computation; use it as
1607     // the "root" for the upcoming intercomm creates.
1608     for (compRoot = 0; rankAssignments[compRoot] != -1; ++compRoot);
1609     // Sanity check
1610     if ((isCompute == thisRanksType) && (0 == newIntracommRank)) ASSERT(compRoot == parentRank);
1611    
1612    
1613     // If this is an I/O rank, split the newIntracomm (which, for
1614     // the i/o ranks, is a communicator among all the i/o ranks)
1615     // into the pieces as assigned.
1616    
1617     if (isIO == thisRanksType) {
1618     MPI_Comm fieldIntracomm;
1619     int myField = rankAssignments[parentRank];
1620     ASSERT(myField >= 0);
1621    
1622     MPI_Comm_split(newIntracomm, myField, parentRank, &fieldIntracomm);
1623     thisEpochStyle[myField].ioRanksIntracomm = fieldIntracomm;
1624    
1625     // Now create an inter-communicator between the computational
1626     // ranks, and each of the newly split off field communicators.
1627     for (i = 0; i < nF; ++i) {
1628    
1629     // Do one field at a time to avoid clashes on parentComm
1630     MPI_Barrier(newIntracomm);
1631    
1632     if (myField != i) continue;
1633    
1634     // Create the intercomm for this field for this epoch style
1635     MPI_Intercomm_create(fieldIntracomm, 0, parentComm,
1636     compRoot, i, &(thisEpochStyle[myField].dataIntercomm));
1637    
1638     // ... and dup a separate one for tile registrations
1639     MPI_Comm_dup(thisEpochStyle[myField].dataIntercomm,
1640     &(thisEpochStyle[myField].registrationIntercomm));
1641     }
1642     }
1643     else {
1644     // This is a computational rank; create the intercommunicators
1645     // to the various split off separate field communicators.
1646    
1647     for (fieldIndex = 0; fieldIndex < nF; ++fieldIndex) {
1648    
1649     // Find the remote "root" process for this field
1650     int fieldRoot = -1;
1651     while (rankAssignments[++fieldRoot] != fieldIndex);
1652    
1653     // Create the intercomm for this field for this epoch style
1654     MPI_Intercomm_create(newIntracomm, 0, parentComm, fieldRoot,
1655     fieldIndex, &(thisEpochStyle[fieldIndex].dataIntercomm));
1656    
1657     // ... and dup a separate one for tile registrations
1658     MPI_Comm_dup(thisEpochStyle[fieldIndex].dataIntercomm,
1659     &(thisEpochStyle[fieldIndex].registrationIntercomm));
1660     }
1661     }
1662    
1663     } // epoch style loop
1664    
1665    
1666     // I/O processes split off and start receiving data
1667     // NOTE: the I/O processes DO NOT RETURN from this call
1668     if (isIO == thisRanksType) ioRankMain(totalNumTiles);
1669    
1670    
1671     // The "compute" processes now return with their new intra-communicator.
1672     *rtnComputeComm = newIntracomm;
1673    
1674     // but first, call mpiio initialization routine!
1675     initSizesAndTypes();
1676    
1677     return;
1678     }
1679    
1680    
1681    
1682    
1683     // "Register" the tile-id(s) that this process will be sending
1684     void
1685     f2(int tileID)
1686     {
1687     int i, epochStyleIndex;
1688    
1689     static int count = 0;
1690    
1691     // This code assumes that the same tileID will apply to all the
1692     // fields. Note that we use the tileID as a tag, so we require it
1693     // be an int with a legal tag value, but we do *NOT* actually use
1694     // the tag *value* for anything (e.g. it is NOT used as an index
1695     // into an array). It is treated as an opaque handle that is
1696     // passed from the compute task(s) to the compositing routines.
1697     // Those two end-points likely assign meaning to the tileID (i.e.
1698     // use it to identify where the tile belongs within the domain),
1699     // but that is their business.
1700     //
1701     // A tileID of -1 signifies the end of registration.
1702    
1703     ASSERT(computeIntracommRank >= 0);
1704    
1705     if (tileID >= 0) {
1706     // In this version of the code, in addition to the tileID, we also
1707     // multiplex in the low order bit(s) of the epoch number as an
1708     // error check. So the tile value must be small enough to allow that.
1709     ASSERT(((tileID<<numCheckBits) + ((1<<numCheckBits)-1)) < maxTagValue);
1710    
1711     // In this version of the code, we do not send the actual tileID's
1712     // to the i/o processes during the registration procedure. We only
1713     // send a count of the number of tiles that we will send.
1714     ++count;
1715     return;
1716     }
1717    
1718    
1719     // We get here when we are called with a negative tileID, signifying
1720     // the end of registration. We now need to figure out and inform
1721     // *each* i/o process in *each* field just how many tiles we will be
1722     // sending them in *each* epoch style.
1723    
1724     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1725     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1726    
1727     int curFieldIndex = -1;
1728     while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1729     fieldInfoThisEpoch_t *thisField = &(thisEpochStyle[curFieldIndex]);
1730     int numRemoteRanks, *tileCounts, remainder;
1731     MPI_Comm_remote_size(thisField->dataIntercomm, &numRemoteRanks);
1732    
1733     tileCounts = alloca(numRemoteRanks * sizeof(int));
1734    
1735     memset(tileCounts,0,numRemoteRanks * sizeof(int));
1736    
1737     // Distribute the tiles among the i/o ranks.
1738     for (i = 0; i < numRemoteRanks; ++i) {
1739     tileCounts[i] = count / numRemoteRanks;
1740     }
1741     // Deal with any remainder
1742     remainder = count - ((count / numRemoteRanks) * numRemoteRanks);
1743     for (i = 0; i < remainder; ++i) {
1744     int target = (computeIntracommRank + i) % numRemoteRanks;
1745     tileCounts[target] += 1;
1746     }
1747    
1748     // Communicate these counts to the i/o processes for this
1749     // field. Note that we send a message to each process,
1750     // even if the count is zero.
1751     for (i = 0; i < numRemoteRanks; ++i) {
1752     MPI_Send(NULL, 0, MPI_BYTE, i, tileCounts[i],
1753     thisField->registrationIntercomm);
1754     }
1755    
1756     } // field loop
1757     } // epoch-style loop
1758    
1759     }
1760    
1761    
1762    
1763    
1764     int currentEpochID = 0;
1765     int currentEpochStyle = 0;
1766    
1767     void
1768     beginNewEpoch(int newEpochID, int gcmIter, int epochStyle)
1769     {
1770     fieldInfoThisEpoch_t *p;
1771    
1772     if (newEpochID != (currentEpochID + 1)) {
1773     fprintf(stderr, "ERROR: Missing i/o epoch? was %d, is now %d ??\n",
1774     currentEpochID, newEpochID);
1775     sleep(1); // Give the message a chance to propagate
1776     abort();
1777     }
1778    
1779     ////////////////////////////////////////////////////////////////////////
1780     // Need to be sure the i/o tasks are done processing the previous epoch
1781     // before any compute tasks start sending tiles from a new epoch.
1782    
1783     if (0 == computeIntracommRank) {
1784     int cmd[4] = { cmd_newEpoch, newEpochID, epochStyle, gcmIter };
1785    
1786     // Wait to get an ack that the i/o tasks are done with the prior epoch.
1787     MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1788     globalIntercomm, MPI_STATUS_IGNORE);
1789    
1790     // Tell the i/o ranks about the new epoch.
1791     // (Just tell rank 0; it will bcast to the others)
1792     MPI_Send(cmd, 4, MPI_INT, 0, 0, globalIntercomm);
1793     }
1794    
1795     // Compute ranks wait here until rank 0 gets the ack from the i/o ranks
1796     MPI_Barrier(computeIntracomm);
1797    
1798    
1799     currentEpochID = newEpochID;
1800     currentEpochStyle = epochStyle;
1801    
1802     // Reset the tileCount (used by f3)
1803     for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1804     p->tileCount = 0;
1805     }
1806     }
1807    
1808    
1809     void
1810     f3(char dataFieldID, int tileID, int epochID, void *data)
1811     {
1812     int whichField, receiver, tag;
1813    
1814     static char priorDataFieldID = '\0';
1815     static int priorEpoch = -1;
1816     static int remoteCommSize;
1817     static fieldInfoThisEpoch_t *p;
1818    
1819     int flag=0;
1820    
1821     // Check that this global has been set
1822     ASSERT(computeIntracommRank >= 0);
1823    
1824    
1825     // Figure out some info about this dataFieldID. It is
1826     // likely to be another tile from the same field as last time
1827     // we were called, in which case we can reuse the "static" values
1828     // retained from the prior call. If not, we need to look it up.
1829    
1830     if ((dataFieldID != priorDataFieldID) || (epochID != priorEpoch)) {
1831    
1832     // It's not the same; we need to look it up.
1833    
1834     for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1835     if (p->dataFieldID == dataFieldID) break;
1836     }
1837     // Make sure we found a valid field
1838    
1839     ASSERT(p->dataIntercomm != MPI_COMM_NULL);
1840    
1841     MPI_Comm_remote_size(p->dataIntercomm, &remoteCommSize);
1842    
1843     flag=1;
1844    
1845     }
1846    
1847     ASSERT(p->dataFieldID == dataFieldID);
1848    
1849     receiver = (computeIntracommRank + p->tileCount) % remoteCommSize;
1850    
1851     tag = (tileID << numCheckBits) | (epochID & bitMask);
1852    
1853     //fprintf(stderr,"%d %d\n",flag,tileID);
1854    
1855     /*
1856     if (tileID==189){
1857     int i,j;
1858     for (i=0;i<TILE_Y;++i){
1859     for (j=0;j<TILE_X;++j)
1860     fprintf(stderr,"%f ",((double*)data)[872+i*108+j]);
1861     fprintf(stderr,"\n");
1862     }
1863     }
1864    
1865     */
1866    
1867    
1868    
1869     FPRINTF(stderr,"Rank %d sends field '%c', tile %d, to i/o task %d with tag %d(%d)\n",
1870     computeIntracommRank, dataFieldID, tileID, receiver, tag, tag >> numCheckBits);
1871    
1872     MPI_Send(data, tileOneZLevelSizeInBytes * p->zDepth,
1873     MPI_BYTE, receiver, tag, p->dataIntercomm);
1874    
1875     p->tileCount += 1;
1876     priorDataFieldID = dataFieldID;
1877     priorEpoch = epochID;
1878     }
1879    
1880    
1881    
1882     void
1883     f4(int epochID)
1884     {
1885     int i;
1886     ASSERT(computeIntracommRank >= 0);
1887    
1888     if (0 == computeIntracommRank) {
1889     int cmd[3] = { cmd_exit, -1, -1 };
1890    
1891     // Recv the ack that the prior i/o epoch is complete
1892     MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1893     globalIntercomm, MPI_STATUS_IGNORE);
1894    
1895     // Tell the i/o ranks to exit
1896     // Just tell rank 0; it will bcast to the others
1897     MPI_Send(cmd, 3, MPI_INT, 0, 0, globalIntercomm);
1898     }
1899    
1900     }
1901    
1902    
1903    
1904     void myasyncio_set_global_sizes_(int *nx, int *ny, int *nz,
1905     int *nt, int *nb,
1906     int *tx, int *ty,
1907     int *ox, int *oy)
1908     {
1909    
1910    
1911     int data[] = {*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy};
1912    
1913     int items = sizeof(data)/sizeof(int);
1914    
1915    
1916     NTILES = *nt; // total number of tiles
1917    
1918    
1919     TILE_X = *tx;
1920     TILE_Y = *ty;
1921     XGHOSTS = *ox;
1922     YGHOSTS = *oy;
1923     tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
1924     tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // needed by compute ranks in f3
1925    
1926    
1927     int rank,ierr;
1928     MPI_Comm_rank(globalIntercomm,&rank);
1929    
1930    
1931     if (0==rank)
1932     printf("%d %d %d\n%d %d\n%d %d\n%d %d\n",*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy);
1933    
1934    
1935     /*
1936     if (0==rank)
1937     ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_ROOT,globalIntercomm);
1938     else
1939     ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1940     */
1941    
1942     if (0==rank)
1943     ierr=MPI_Bcast(data,items,MPI_INT,MPI_ROOT,globalIntercomm);
1944     else
1945     ierr=MPI_Bcast(data,items,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1946    
1947     }
1948    
1949     void asyncio_tile_arrays_(int *xoff, int *yoff, int *xskip)
1950     {
1951     int rank,ierr;
1952     MPI_Comm_rank(globalIntercomm,&rank);
1953    
1954     if (0==rank)
1955     ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1956     else
1957     ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1958    
1959     if (0==rank)
1960     ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1961     else
1962     ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1963    
1964     if (0==rank)
1965     ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1966     else
1967     ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1968    
1969     }
1970    
1971    
1972    
1973    
1974    
1975     //////////////////////////////////////////////////////////////////////
1976     // Fortran interface
1977    
1978     void
1979     bron_f1(
1980     MPI_Fint *ptr_parentComm,
1981     int *ptr_numComputeRanks,
1982     int *ptr_totalNumTiles,
1983     MPI_Fint *rtnComputeComm)
1984     {
1985     // Convert the Fortran handle into a C handle
1986     MPI_Comm newComm, parentComm = MPI_Comm_f2c(*ptr_parentComm);
1987    
1988     f1(parentComm, *ptr_numComputeRanks, *ptr_totalNumTiles, &newComm);
1989    
1990     // Convert the C handle into a Fortran handle
1991     *rtnComputeComm = MPI_Comm_c2f(newComm);
1992     }
1993    
1994    
1995    
1996     void
1997     bron_f2(int *ptr_tileID)
1998     {
1999     f2(*ptr_tileID);
2000     }
2001    
2002    
2003     void
2004     beginnewepoch_(int *newEpochID, int *gcmIter, int *epochStyle)
2005     {
2006     beginNewEpoch(*newEpochID, *gcmIter, *epochStyle);
2007     }
2008    
2009    
2010     void
2011     bron_f3(char *ptr_dataFieldID, int *ptr_tileID, int *ptr_epochID, void *data)
2012     {
2013     f3(*ptr_dataFieldID, *ptr_tileID, *ptr_epochID, data);
2014     }
2015    
2016    
2017    
2018     void
2019     bron_f4(int *ptr_epochID)
2020     {
2021     f4(*ptr_epochID);
2022     }
2023    

  ViewVC Help
Powered by ViewVC 1.1.22