/[MITgcm]/MITgcm_contrib/llc_hires/llc_90/code-async/recvTask.c
ViewVC logotype

Annotation of /MITgcm_contrib/llc_hires/llc_90/code-async/recvTask.c

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.2 - (hide annotations) (download)
Tue Oct 3 02:33:27 2017 UTC (7 years, 10 months ago) by dimitri
Branch: MAIN
Changes since 1.1: +3 -3 lines
File MIME type: text/plain
This code and instructions were tested to work on pleiades with checkpoint65v
There is something not quite right about checkpoint65v dumpfreq output.
For example, 2D output files are 421080 instead of 421200 long.

1 dimitri 1.1
2     // Code to do the m-on-n fan-in, recompositing, and output, of data
3     // tiles for mitGCM.
4     //
5     // There are aspects of this code that would be simpler in C++, but
6     // we deliberately wrote the code in ansi-C to make linking it with
7     // the Fortran main code easier (should be able to just include the
8     // .o on the link line).
9    
10    
11     #include "PACKAGES_CONFIG.h"
12    
13     #include <stdio.h>
14     #include <unistd.h>
15     #include <stdlib.h>
16     #include <string.h>
17     #include <sys/types.h>
18     #include <sys/stat.h>
19     #include <fcntl.h>
20    
21     #include <mpi.h>
22     #include <alloca.h>
23    
24    
25     #include <stdint.h>
26     #include <errno.h>
27    
28     #define DEBUG 1
29    
30     #if (DEBUG >= 2)
31     #define FPRINTF fprintf
32     #else
33     #include <stdarg.h>
34     void FPRINTF(FILE *fp,...){return;}
35     #endif
36    
37    
38     // Define our own version of "assert", where we sleep rather than abort.
39     // This makes it easier to attach the debugger.
40     #if (DEBUG >= 1)
41     #define ASSERT(_expr) \
42     if (!(_expr)) { \
43     fprintf(stderr, "ASSERT failed for pid %d : `%s': %s: %d: %s\n", \
44     getpid(), #_expr, __FILE__, __LINE__, __func__); \
45     sleep(9999); \
46     }
47     #else
48     #define ASSERT assert
49     #endif
50    
51    
52    
53     // If numRanksPerNode is set to be > 0, we just use that setting.
54     // If it is <= 0, we dynamically determine the number of cores on
55     // a node, and use that. (N.B.: we assume *all* nodes being used in
56     // the run have the *same* number of cores.)
57     int numRanksPerNode = 0;
58    
59     // Just an error check; can be zero (if you're confident it's all correct).
60     #define numCheckBits 2
61     // This bitMask definition works for 2's complement
62     #define bitMask ((1 << numCheckBits) - 1)
63    
64    
65     ///////////////////////////////////////////////////////////////////////
66     // Info about the data fields
67    
68     // double for now. Might also be float someday
69     #define datumType double
70     #define datumSize (sizeof(datumType))
71    
72    
73     // Info about the data fields. We assume that all the fields have the
74     // same X and Y characteristics, but may have either 1 or NUM_Z levels.
75     //
76     // the following 3 values are required here. Could be sent over or read in
77     // with some rearrangement in init routines
78     //
79 dimitri 1.2 #define NUM_X 90
80     #define NUM_Y 1170L // get rid of this someday
81     #define NUM_Z 50
82 dimitri 1.1 #define MULTDIM 7
83     #define twoDFieldSizeInBytes (NUM_X * NUM_Y * 1 * datumSize)
84     #define threeDFieldSizeInBytes (twoDFieldSizeInBytes * NUM_Z)
85     #define multDFieldSizeInBytes (twoDFieldSizeInBytes * MULTDIM)
86    
87     // Info about the data tiles. We assume that all the tiles are the same
88     // size (no odd-sized last piece), they all have the same X and Y
89     // characteristics (including ghosting), and are full depth in Z
90     // (either 1, or NUM_Z, or MULTDIM, as appropriate).
91     //
92     // all these data now sent over from compute ranks
93     //
94     int TILE_X = -1;
95     int TILE_Y = -1;
96     int XGHOSTS = -1;
97     int YGHOSTS = -1;
98    
99     // Size of one Z level of a tile (NOT one Z level of a whole field)
100     int tileOneZLevelItemCount = -1;
101     int tileOneZLevelSizeInBytes = -1;
102    
103    
104     typedef struct dataFieldDepth {
105     char dataFieldID;
106     int numZ;
107     } dataFieldDepth_t;
108    
109     dataFieldDepth_t fieldDepths[] = {
110     { 'A', MULTDIM }, // seaice, 7 == MULTDIM in SEAICE_SIZE.h
111    
112     { 'B', 1 },
113     { 'C', 1 },
114     { 'D', 1 },
115     { 'E', 1 },
116     { 'F', 1 },
117     { 'G', 1 },
118     { 'H', 1 },
119     { 'I', 1 },
120     { 'J', 1 },
121     { 'K', 1 },
122     { 'L', 1 },
123     { 'M', 1 },
124     { 'N', 1 },
125     { 'O', 1 },
126     { 'P', 1 },
127     { 'Q', 1 },
128     { 'R', 1 },
129    
130     { 'S', NUM_Z },
131     { 'T', NUM_Z },
132     { 'U', NUM_Z },
133     { 'V', NUM_Z },
134     { 'W', NUM_Z },
135     { 'X', NUM_Z },
136     { 'Y', NUM_Z },
137     };
138     #define numAllFields (sizeof(fieldDepths)/sizeof(dataFieldDepth_t))
139    
140    
141    
142     ///////////////////////////////////////////////////////////////////////
143     // Info about the various kinds of i/o epochs
144     typedef struct epochFieldInfo {
145     char dataFieldID;
146     MPI_Comm registrationIntercomm;
147     MPI_Comm dataIntercomm;
148     MPI_Comm ioRanksIntracomm;
149     int tileCount;
150     int zDepth; // duplicates the fieldDepth entry; filled in automatically
151     char filenameTemplate[128];
152     long int offset;
153     int pickup;
154     } fieldInfoThisEpoch_t;
155    
156     // The normal i/o dump
157     fieldInfoThisEpoch_t fieldsForEpochStyle_0[] = {
158     { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "U.%010d.%s", 0, 0 },
159     { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "V.%010d.%s", 0, 0 },
160     { 'W', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "W.%010d.%s", 0, 0 },
161     { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Salt.%010d.%s", 0, 0 },
162     { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Theta.%010d.%s", 0,0 },
163     { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Eta.%010d.%s", 0,0 },
164    
165     { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIarea.%010d.%s", 0,0 },
166     { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIheff.%010d.%s", 0,0 },
167     { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIhsnow.%010d.%s", 0,0 },
168     { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIuice.%010d.%s", 0,0 },
169     { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIvice.%010d.%s", 0,0 },
170     { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIhsalt.%010d.%s", 0,0 },
171    
172     //{ 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "EtaHnm1.%010d.%s", 0,0 },
173     { 'I', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceTAUX.%010d.%s", 0,0 },
174     { 'J', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceTAUY.%010d.%s", 0,0 },
175     { 'K', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "KPPhbl.%010d.%s", 0,0 },
176     { 'L', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceSflux.%010d.%s", 0,0 },
177    
178     { 'M', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceFWflx.%010d.%s", 0,0 },
179     { 'O', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceQnet.%010d.%s", 0,0 },
180     { 'P', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "PhiBot.%010d.%s", 0,0 },
181     { 'Q', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceQsw.%010d.%s", 0,0 },
182     //{ 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "dEtaHdt.%010d.%s", 0,0 },
183    
184     {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0,0 },
185     };
186    
187    
188     // pickup file
189     fieldInfoThisEpoch_t fieldsForEpochStyle_1[] = {
190     { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 1},
191     { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 1},
192     { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 2 + twoDFieldSizeInBytes * 0, 1},
193     { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 3 + twoDFieldSizeInBytes * 0, 1},
194     { 'X', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 4 + twoDFieldSizeInBytes * 0, 1},
195     { 'Y', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 5 + twoDFieldSizeInBytes * 0, 1},
196     { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 0, 1},
197     { 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 1, 1},
198     { 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 2, 1},
199     {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 ,1},
200     };
201    
202    
203     // seaice pickup
204     fieldInfoThisEpoch_t fieldsForEpochStyle_2[] = {
205     { 'A', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 2},
206     { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 2},
207     { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 1, 2},
208     { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 2, 2},
209     { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 3, 2},
210     { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 4, 2},
211     { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 5, 2},
212     {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 },
213     };
214    
215    
216     fieldInfoThisEpoch_t *epochStyles[] = {
217     fieldsForEpochStyle_0,
218     fieldsForEpochStyle_1,
219     fieldsForEpochStyle_2,
220     };
221     int numEpochStyles = (sizeof(epochStyles) / sizeof(fieldInfoThisEpoch_t *));
222    
223    
224     typedef enum {
225     cmd_illegal,
226     cmd_newEpoch,
227     cmd_epochComplete,
228     cmd_exit,
229     } epochCmd_t;
230    
231    
232     ///////////////////////////////////////////////////////////////////////
233    
234     // Note that a rank will only have access to one of the Intracomms,
235     // but all ranks will define the Intercomm.
236     MPI_Comm ioIntracomm = MPI_COMM_NULL;
237     int ioIntracommRank = -1;
238     MPI_Comm computeIntracomm = MPI_COMM_NULL;
239     int computeIntracommRank = -1;
240     MPI_Comm globalIntercomm = MPI_COMM_NULL;
241    
242    
243     #define divCeil(_x,_y) (((_x) + ((_y) - 1)) / (_y))
244     #define roundUp(_x,_y) (divCeil((_x),(_y)) * (_y))
245    
246    
247    
248     typedef enum {
249     bufState_illegal,
250     bufState_Free,
251     bufState_InUse,
252     } bufState_t;
253    
254    
255     typedef struct buf_header{
256     struct buf_header *next;
257     bufState_t state;
258     int requestsArraySize;
259     MPI_Request *requests;
260     uint64_t ck0;
261     char *payload; // [tileOneZLevelSizeInBytes * NUM_Z]; // Max payload size
262     uint64_t ck1; // now rec'vd from compute ranks & dynamically allocated in
263     } bufHdr_t; // allocateTileBufs
264    
265    
266     bufHdr_t *freeTileBufs_ptr = NULL;
267     bufHdr_t *inUseTileBufs_ptr = NULL;
268    
269     int maxTagValue = -1;
270     int totalNumTiles = -1;
271    
272     // routine to convert double to float during memcpy
273     // need to get byteswapping in here as well
274     memcpy_r8_2_r4 (float *f, double *d, long long *n)
275     {
276     long long i, rem;
277     rem = *n%16LL;
278     for (i = 0; i < rem; i++) {
279     f [i] = d [i];
280     }
281     for (i = rem; i < *n; i += 16) {
282     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 10" : : "m" (d [i + 256 + 0]) );
283     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 11" : : "m" (f [i + 256 + 0]) );
284     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm0 # memcpy_r8_2_r4.c 12" : : "m" (d [i + 0]) : "%xmm0");
285     __asm__ __volatile__ ("movss %%xmm0, %0 # memcpy_r8_2_r4.c 13" : "=m" (f [i + 0]) : : "memory");
286     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm1 # memcpy_r8_2_r4.c 14" : : "m" (d [i + 1]) : "%xmm1");
287     __asm__ __volatile__ ("movss %%xmm1, %0 # memcpy_r8_2_r4.c 15" : "=m" (f [i + 1]) : : "memory");
288     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm2 # memcpy_r8_2_r4.c 16" : : "m" (d [i + 2]) : "%xmm2");
289     __asm__ __volatile__ ("movss %%xmm2, %0 # memcpy_r8_2_r4.c 17" : "=m" (f [i + 2]) : : "memory");
290     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm3 # memcpy_r8_2_r4.c 18" : : "m" (d [i + 3]) : "%xmm3");
291     __asm__ __volatile__ ("movss %%xmm3, %0 # memcpy_r8_2_r4.c 19" : "=m" (f [i + 3]) : : "memory");
292     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm4 # memcpy_r8_2_r4.c 20" : : "m" (d [i + 4]) : "%xmm4");
293     __asm__ __volatile__ ("movss %%xmm4, %0 # memcpy_r8_2_r4.c 21" : "=m" (f [i + 4]) : : "memory");
294     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm5 # memcpy_r8_2_r4.c 22" : : "m" (d [i + 5]) : "%xmm5");
295     __asm__ __volatile__ ("movss %%xmm5, %0 # memcpy_r8_2_r4.c 23" : "=m" (f [i + 5]) : : "memory");
296     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm6 # memcpy_r8_2_r4.c 24" : : "m" (d [i + 6]) : "%xmm6");
297     __asm__ __volatile__ ("movss %%xmm6, %0 # memcpy_r8_2_r4.c 25" : "=m" (f [i + 6]) : : "memory");
298     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm7 # memcpy_r8_2_r4.c 26" : : "m" (d [i + 7]) : "%xmm7");
299     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 27" : : "m" (d [i + 256 + 8 + 0]) );
300     __asm__ __volatile__ ("movss %%xmm7, %0 # memcpy_r8_2_r4.c 28" : "=m" (f [i + 7]) : : "memory");
301     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm8 # memcpy_r8_2_r4.c 29" : : "m" (d [i + 8]) : "%xmm8");
302     __asm__ __volatile__ ("movss %%xmm8, %0 # memcpy_r8_2_r4.c 30" : "=m" (f [i + 8]) : : "memory");
303     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm9 # memcpy_r8_2_r4.c 31" : : "m" (d [i + 9]) : "%xmm9");
304     __asm__ __volatile__ ("movss %%xmm9, %0 # memcpy_r8_2_r4.c 32" : "=m" (f [i + 9]) : : "memory");
305     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm10 # memcpy_r8_2_r4.c 33" : : "m" (d [i + 10]) : "%xmm10");
306     __asm__ __volatile__ ("movss %%xmm10, %0 # memcpy_r8_2_r4.c 34" : "=m" (f [i + 10]) : : "memory");
307     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm11 # memcpy_r8_2_r4.c 35" : : "m" (d [i + 11]) : "%xmm11");
308     __asm__ __volatile__ ("movss %%xmm11, %0 # memcpy_r8_2_r4.c 36" : "=m" (f [i + 11]) : : "memory");
309     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm12 # memcpy_r8_2_r4.c 37" : : "m" (d [i + 12]) : "%xmm12");
310     __asm__ __volatile__ ("movss %%xmm12, %0 # memcpy_r8_2_r4.c 38" : "=m" (f [i + 12]) : : "memory");
311     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm13 # memcpy_r8_2_r4.c 39" : : "m" (d [i + 13]) : "%xmm13");
312     __asm__ __volatile__ ("movss %%xmm13, %0 # memcpy_r8_2_r4.c 40" : "=m" (f [i + 13]) : : "memory");
313     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm14 # memcpy_r8_2_r4.c 41" : : "m" (d [i + 14]) : "%xmm14");
314     __asm__ __volatile__ ("movss %%xmm14, %0 # memcpy_r8_2_r4.c 42" : "=m" (f [i + 14]) : : "memory");
315     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm15 # memcpy_r8_2_r4.c 43" : : "m" (d [i + 15]) : "%xmm15");
316     __asm__ __volatile__ ("movss %%xmm15, %0 # memcpy_r8_2_r4.c 44" : "=m" (f [i + 15]) : : "memory");
317     }
318     }
319    
320     // Debug routine
321     countBufs(int nbufs)
322     {
323     int nInUse, nFree;
324     bufHdr_t *bufPtr;
325     static int target = -1;
326    
327     if (-1 == target) {
328     ASSERT(-1 != nbufs);
329     target = nbufs;
330     }
331    
332     bufPtr = freeTileBufs_ptr;
333     for (nFree = 0; bufPtr != NULL; bufPtr = bufPtr->next) nFree += 1;
334    
335     bufPtr = inUseTileBufs_ptr;
336     for (nInUse = 0; bufPtr != NULL; bufPtr = bufPtr->next) nInUse += 1;
337    
338     if (nInUse + nFree != target) {
339     fprintf(stderr, "Rank %d: bad number of buffs: free %d, inUse %d, should be %d\n",
340     ioIntracommRank, nFree, nInUse, target);
341     sleep(5000);
342     }
343     }
344    
345     int readn(int fd, void *p, int nbytes)
346     {
347    
348     char *ptr = (char*)(p);
349    
350     int nleft, nread;
351    
352     nleft = nbytes;
353     while (nleft > 0){
354     nread = read(fd, ptr, nleft);
355     if (nread < 0)
356     return(nread); // error
357     else if (nread == 0)
358     break; // EOF
359    
360     nleft -= nread;
361     ptr += nread;
362     }
363     return (nbytes - nleft);
364     }
365    
366    
367     ssize_t writen(int fd, void *p, size_t nbytes)
368     {
369     char *ptr = (char*)(p);
370    
371     size_t nleft;
372     ssize_t nwritten;
373    
374     nleft = nbytes;
375     while (nleft > 0){
376     nwritten = write(fd, ptr, nleft);
377     if (nwritten <= 0){
378     if (errno==EINTR) continue; // POSIX, not SVr4
379     return(nwritten); // non-EINTR error
380     }
381    
382     nleft -= nwritten;
383     ptr += nwritten;
384     }
385     return(nbytes - nleft);
386     }
387    
388    
389     void write_pickup_meta(FILE *fp, int gcmIter, int pickup)
390     {
391     int i;
392     int ndims = 2;
393     int nrecords,nfields;
394     char**f;
395     char*fld1[] = {"Uvel","Vvel","Theta","Salt","GuNm1","GvNm1","EtaN","dEtaHdt","EtaH"};
396     char*fld2[] = {"siTICES","siAREA","siHEFF","siHSNOW","siHSALT","siUICE","siVICE"};
397     // for now, just list the fields here. When the whole field specification apparatus
398     // is cleaned up, pull the names out of the epochstyle definition or whatever
399    
400     ASSERT(1==pickup||2==pickup);
401    
402     if (1==pickup){
403     nrecords = 6*NUM_Z+3;
404     nfields = sizeof(fld1)/sizeof(char*);
405     f = fld1;
406     }
407     else if (2==pickup){
408     nrecords = MULTDIM+6;
409     nfields = sizeof(fld2)/sizeof(char*);
410     f = fld2;
411     }
412    
413     fprintf(fp," nDims = [ %3d ];\n",ndims);
414     fprintf(fp," dimList = [\n");
415     fprintf(fp," %10u,%10d,%10u,\n",NUM_X,1,NUM_X);
416     fprintf(fp," %10ld,%10d,%10ld\n",NUM_Y,1,NUM_Y);
417     fprintf(fp," ];\n");
418     fprintf(fp," dataprec = [ 'float64' ];\n");
419     fprintf(fp," nrecords = [ %5d ];\n",nrecords);
420     fprintf(fp," timeStepNumber = [ %10d ];\n",gcmIter);
421     fprintf(fp," timeInterval = [ %19.12E ];\n",0.0); // what should this be?
422     fprintf(fp," nFlds = [ %4d ];\n",nfields);
423     fprintf(fp," fldList = {\n");
424     for (i=0;i<nfields;++i)
425     fprintf(fp," '%-8s'",f[i]);
426     fprintf(fp,"\n };\n");
427     }
428    
429    
430    
431     double *outBuf=NULL;//[NUM_X*NUM_Y*NUM_Z]; // only needs to be myNumZSlabs
432     size_t outBufSize=0;
433    
434    
435     void
436     do_write(int io_epoch, fieldInfoThisEpoch_t *whichField, int firstZ, int numZ, int gcmIter)
437     {
438     if (0==numZ) return; // this is *not* global NUM_Z : change name of parameter to avoid grief!
439    
440     int pickup = whichField->pickup;
441    
442     ///////////////////////////////
443     // swap here, if necessary
444     // int i,j;
445    
446     //i = NUM_X*NUM_Y*numZ;
447     //mds_byteswapr8_(&i,outBuf);
448    
449     // mds_byteswapr8 expects an integer count, which is gonna overflow someday
450     // can't redefine to long without affecting a bunch of other calls
451     // so do a slab at a time here, to delay the inevitable
452     // i = NUM_X*NUM_Y;
453     //for (j=0;j<numZ;++j)
454     // mds_byteswapr8_(&i,&outBuf[i*j]);
455    
456     // gnu builtin evidently honored by intel compilers
457    
458     if (pickup) {
459     uint64_t *alias = (uint64_t*)outBuf;
460     size_t i;
461     for (i=0;i<NUM_X*NUM_Y*numZ;++i)
462     alias[i] = __builtin_bswap64(alias[i]);
463     }
464     else {
465     uint32_t *alias = (uint32_t*)outBuf;
466     size_t i;
467     for (i=0;i<NUM_X*NUM_Y*numZ;++i)
468     alias[i] = __builtin_bswap32(alias[i]);
469     }
470    
471     // end of swappiness
472     //////////////////////////////////
473    
474     char s[1024];
475     //sprintf(s,"henze_%d_%d_%c.dat",io_epoch,gcmIter,whichField->dataFieldID);
476    
477     sprintf(s,whichField->filenameTemplate,gcmIter,"data");
478    
479     int fd = open(s,O_CREAT|O_WRONLY,S_IRWXU|S_IRGRP);
480     ASSERT(fd!=-1);
481    
482     size_t nbytes;
483    
484     if (pickup) {
485     lseek(fd,whichField->offset,SEEK_SET);
486     lseek(fd,firstZ*NUM_X*NUM_Y*datumSize,SEEK_CUR);
487     nbytes = NUM_X*NUM_Y*numZ*datumSize;
488     }
489     else {
490     lseek(fd,firstZ*NUM_X*NUM_Y*sizeof(float),SEEK_CUR);
491     nbytes = NUM_X*NUM_Y*numZ*sizeof(float);
492     }
493    
494     ssize_t bwrit = writen(fd,outBuf,nbytes);
495    
496     if (-1==bwrit) perror("Henze : file write problem : ");
497    
498     FPRINTF(stderr,"Wrote %d of %d bytes (%d -> %d) \n",bwrit,nbytes,firstZ,numZ);
499    
500     // ASSERT(nbytes == bwrit);
501    
502     if (nbytes!=bwrit)
503     fprintf(stderr,"WROTE %ld /%ld\n",bwrit,nbytes);
504    
505     close(fd);
506    
507    
508     return;
509     }
510    
511    
512     int NTILES = -1;
513    
514     typedef struct {
515     int off;
516     int skip;
517     } tile_layout_t;
518    
519     tile_layout_t *offsetTable;
520    
521     void
522     processSlabSection(
523     fieldInfoThisEpoch_t *whichField,
524     int tileID,
525     void *data,
526     int myNumZSlabs)
527     {
528     int intracommSize,intracommRank;
529     // MPI_Comm_size(whichField->ioRanksIntracomm, &intracommSize);
530     MPI_Comm_rank(whichField->ioRanksIntracomm, &intracommRank);
531     //printf("i/o rank %d/%d recv'd %d::%d (%d->%d)\n",intracommRank,intracommSize,whichField,tileID,firstZ,lastZ);
532     // printf("rank %d : tile %d is gonna go at %d and stride with %d, z = %d\n",intracommRank,tileID,offsetTable[tileID].off,
533     // offsetTable[tileID].skip,myNumZSlabs);
534    
535     int z;
536    
537     int pickup = whichField->pickup;
538    
539     //ASSERT((tileID > 0) && (tileID < (sizeof(offsetTable)/sizeof(tile_layout_t))));
540     ASSERT( (tileID > 0) && (tileID <= NTILES) ); // offsetTable now dynamically allocated
541    
542     if (myNumZSlabs * twoDFieldSizeInBytes > outBufSize){
543    
544     free(outBuf);
545    
546     outBufSize = myNumZSlabs * twoDFieldSizeInBytes;
547    
548     outBuf = malloc(outBufSize);
549     ASSERT(outBuf);
550    
551     memset(outBuf,0,outBufSize);
552     }
553    
554     for (z=0;z<myNumZSlabs;++z){
555    
556     off_t zoffset = z*TILE_X*TILE_Y*NTILES; //NOT totalNumTiles;
557     off_t hoffset = offsetTable[tileID].off;
558     off_t skipdst = offsetTable[tileID].skip;
559    
560     //double *dst = outBuf + zoffset + hoffset;
561     void *dst;
562     if (pickup)
563     dst = outBuf + zoffset + hoffset;
564     else
565     dst = (float*)outBuf + zoffset + hoffset;
566    
567     off_t zoff = z*(TILE_X+2*XGHOSTS)*(TILE_Y+2*YGHOSTS);
568     off_t hoff = YGHOSTS*(TILE_X+2*XGHOSTS) + YGHOSTS;
569     double *src = (double*)data + zoff + hoff;
570    
571     off_t skipsrc = TILE_X+2*XGHOSTS;
572    
573     //fprintf(stderr,"rank %d tile %d offset %d skip %d dst %x src %x\n",intracommRank,tileID,hoffset,skipdst,dst,src);
574    
575     long long n = TILE_X;
576    
577     int y;
578     if (pickup)
579     for (y=0;y<TILE_Y;++y)
580     memcpy((double*)dst + y * skipdst, src + y * skipsrc, TILE_X*datumSize);
581     else
582     for (y=0;y<TILE_Y;++y)
583     memcpy_r8_2_r4((float*)dst + y * skipdst, src + y * skipsrc, &n);
584     }
585    
586     return;
587     }
588    
589    
590    
591     // Allocate some buffers to receive tile-data from the compute ranks
592     void
593     allocateTileBufs(int numTileBufs, int maxIntracommSize)
594     {
595    
596     ASSERT(tileOneZLevelSizeInBytes>0); // be sure we have rec'vd values by now
597    
598     int i, j;
599     for (i = 0; i < numTileBufs; ++i) {
600    
601     bufHdr_t *newBuf = malloc(sizeof(bufHdr_t));
602     ASSERT(NULL != newBuf);
603    
604     newBuf->payload = malloc(tileOneZLevelSizeInBytes * NUM_Z);
605     ASSERT(NULL != newBuf->payload);
606    
607     newBuf->requests = malloc(maxIntracommSize * sizeof(MPI_Request));
608     ASSERT(NULL != newBuf->requests);
609    
610     // Init some values
611     newBuf->requestsArraySize = maxIntracommSize;
612     for (j = 0; j < maxIntracommSize; ++j) {
613     newBuf->requests[j] = MPI_REQUEST_NULL;
614     }
615     newBuf->ck0 = newBuf->ck1 = 0xdeadbeefdeadbeef;
616    
617     // Put the buf into the free list
618     newBuf->state = bufState_Free;
619     newBuf->next = freeTileBufs_ptr;
620     freeTileBufs_ptr = newBuf;
621     }
622     }
623    
624    
625    
626     bufHdr_t *
627     getFreeBuf()
628     {
629     int j;
630     bufHdr_t *rtnValue = freeTileBufs_ptr;
631    
632     if (NULL != rtnValue) {
633     ASSERT(bufState_Free == rtnValue->state);
634     freeTileBufs_ptr = rtnValue->next;
635     rtnValue->next = NULL;
636    
637     // Paranoia. This should already be the case
638     for (j = 0; j < rtnValue->requestsArraySize; ++j) {
639     rtnValue->requests[j] = MPI_REQUEST_NULL;
640     }
641     }
642     return rtnValue;
643     }
644    
645    
646     /////////////////////////////////////////////////////////////////
647    
648    
649     bufHdr_t *
650     tryToReceiveDataTile(
651     MPI_Comm dataIntercomm,
652     int epochID,
653     size_t expectedMsgSize,
654     int *tileID)
655     {
656     bufHdr_t *bufHdr;
657     int pending, i, count;
658     MPI_Status mpiStatus;
659    
660     MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, dataIntercomm,
661     &pending, &mpiStatus);
662    
663     // If no data are pending, or if we can't get a buffer to
664     // put the pending data into, return NULL
665     if (!pending) return NULL;
666     if (((bufHdr = getFreeBuf()) == NULL)) {
667     FPRINTF(stderr,"tile %d(%d) pending, but no buf to recv it\n",
668     mpiStatus.MPI_TAG, mpiStatus.MPI_TAG >> numCheckBits);
669     return NULL;
670     }
671    
672     // Do sanity checks on the pending message
673     MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
674     ASSERT(count == expectedMsgSize);
675     ASSERT((mpiStatus.MPI_TAG & bitMask) == (epochID & bitMask));
676    
677     // Recieve the data we saw in the iprobe
678     MPI_Recv(bufHdr->payload, expectedMsgSize, MPI_BYTE,
679     mpiStatus.MPI_SOURCE, mpiStatus.MPI_TAG,
680     dataIntercomm, &mpiStatus);
681     bufHdr->state = bufState_InUse;
682    
683     // Overrun check
684     ASSERT(bufHdr->ck0==0xdeadbeefdeadbeef);
685     ASSERT(bufHdr->ck0==bufHdr->ck1);
686    
687     // Return values
688     *tileID = mpiStatus.MPI_TAG >> numCheckBits;
689    
690    
691     FPRINTF(stderr, "recv tile %d(%d) from compute rank %d\n",
692     mpiStatus.MPI_TAG, *tileID, mpiStatus.MPI_SOURCE);
693    
694     return bufHdr;
695     }
696    
697    
698    
699     int
700     tryToReceiveZSlab(
701     void *buf,
702     int expectedMsgSize,
703     MPI_Comm intracomm)
704     {
705     MPI_Status mpiStatus;
706     int pending, count;
707    
708     MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, intracomm, &pending, &mpiStatus);
709     if (!pending) return -1;
710    
711     MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
712     ASSERT(count == expectedMsgSize);
713    
714     MPI_Recv(buf, count, MPI_BYTE, mpiStatus.MPI_SOURCE,
715     mpiStatus.MPI_TAG, intracomm, &mpiStatus);
716    
717     FPRINTF(stderr, "recv slab %d from rank %d\n",
718     mpiStatus.MPI_TAG, mpiStatus.MPI_SOURCE);
719    
720     // return the tileID
721     return mpiStatus.MPI_TAG;
722     }
723    
724    
725    
726     void
727     redistributeZSlabs(
728     bufHdr_t *bufHdr,
729     int tileID,
730     int zSlabsPer,
731     int thisFieldNumZ,
732     MPI_Comm intracomm)
733     {
734     int pieceSize = zSlabsPer * tileOneZLevelSizeInBytes;
735     int tileSizeInBytes = tileOneZLevelSizeInBytes * thisFieldNumZ;
736     int offset = 0;
737     int recvRank = 0;
738    
739     while ((offset + pieceSize) <= tileSizeInBytes) {
740    
741     MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
742     tileID, intracomm, &(bufHdr->requests[recvRank]));
743     ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
744    
745     offset += pieceSize;
746     recvRank += 1;
747     }
748    
749     // There might be one last odd-sized piece
750     if (offset < tileSizeInBytes) {
751     pieceSize = tileSizeInBytes - offset;
752     ASSERT(pieceSize % tileOneZLevelSizeInBytes == 0);
753    
754     MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
755     tileID, intracomm, &(bufHdr->requests[recvRank]));
756     ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
757    
758     offset += pieceSize;
759     recvRank += 1;
760     }
761    
762     // Sanity check
763     ASSERT(recvRank <= bufHdr->requestsArraySize);
764     while (recvRank < bufHdr->requestsArraySize) {
765     ASSERT(MPI_REQUEST_NULL == bufHdr->requests[recvRank++])
766     }
767     }
768    
769    
770    
771     /////////////////////////////////////////////////////////////////
772     // This is called by the i/o ranks
773     void
774     doNewEpoch(int epochID, int epochStyleIndex, int gcmIter)
775     {
776     // In an i/o epoch, the i/o ranks are partitioned into groups,
777     // each group dealing with one field. The ranks within a group:
778     // (1) receive data tiles from the compute ranks
779     // (2) slice the received data tiles into slabs in the 'z'
780     // dimension, and redistribute the tile-slabs among the group.
781     // (3) receive redistributed tile-slabs, and reconstruct a complete
782     // field-slab for the whole field.
783     // (4) Write the completed field-slab to disk.
784    
785     fieldInfoThisEpoch_t *fieldInfo;
786     int intracommRank, intracommSize;
787    
788     int zSlabsPer;
789     int myNumZSlabs, myFirstZSlab, myLastZSlab, myNumSlabPiecesToRecv;
790    
791     int numTilesRecvd, numSlabPiecesRecvd;
792    
793    
794     // Find the descriptor for my assigned field for this epoch style.
795     // It is the one whose dataIntercomm is not null
796     fieldInfo = epochStyles[epochStyleIndex];
797     while (MPI_COMM_NULL == fieldInfo->dataIntercomm) ++fieldInfo;
798     ASSERT('\0' != fieldInfo->dataFieldID);
799    
800    
801    
802     MPI_Comm_rank(fieldInfo->ioRanksIntracomm, &intracommRank);
803     MPI_Comm_size(fieldInfo->ioRanksIntracomm, &intracommSize);
804    
805    
806     // Compute which z slab(s) we will be reassembling.
807     zSlabsPer = divCeil(fieldInfo->zDepth, intracommSize);
808     myNumZSlabs = zSlabsPer;
809    
810     // Adjust myNumZSlabs in case it didn't divide evenly
811     myFirstZSlab = intracommRank * myNumZSlabs;
812     if (myFirstZSlab >= fieldInfo->zDepth) {
813     myNumZSlabs = 0;
814     } else if ((myFirstZSlab + myNumZSlabs) > fieldInfo->zDepth) {
815     myNumZSlabs = fieldInfo->zDepth - myFirstZSlab;
816     } else {
817     myNumZSlabs = zSlabsPer;
818     }
819     myLastZSlab = myFirstZSlab + myNumZSlabs - 1;
820    
821     // If we were not assigned any z-slabs, we don't get any redistributed
822     // tile-slabs. If we were assigned one or more slabs, we get
823     // redistributed tile-slabs for every tile.
824     myNumSlabPiecesToRecv = (0 == myNumZSlabs) ? 0 : totalNumTiles;
825    
826    
827     numTilesRecvd = 0;
828     numSlabPiecesRecvd = 0;
829    
830     ////////////////////////////////////////////////////////////////////
831     // Main loop. Handle tiles from the current epoch
832     for(;;) {
833     bufHdr_t *bufHdr;
834    
835     ////////////////////////////////////////////////////////////////
836     // Check for tiles from the computational tasks
837     while (numTilesRecvd < fieldInfo->tileCount) {
838     int tileID = -1;
839     size_t msgSize = tileOneZLevelSizeInBytes * fieldInfo->zDepth;
840     bufHdr = tryToReceiveDataTile(fieldInfo->dataIntercomm, epochID,
841     msgSize, &tileID);
842     if (NULL == bufHdr) break; // No tile was received
843    
844     numTilesRecvd += 1;
845     redistributeZSlabs(bufHdr, tileID, zSlabsPer,
846     fieldInfo->zDepth, fieldInfo->ioRanksIntracomm);
847    
848     // Add the bufHdr to the "in use" list
849     bufHdr->next = inUseTileBufs_ptr;
850     inUseTileBufs_ptr = bufHdr;
851    
852    
853     }
854    
855    
856     ////////////////////////////////////////////////////////////////
857     // Check for tile-slabs redistributed from the i/o processes
858     while (numSlabPiecesRecvd < myNumSlabPiecesToRecv) {
859     int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs;
860     char data[msgSize];
861    
862     int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm);
863     if (tileID < 0) break; // No slab was received
864    
865    
866     numSlabPiecesRecvd += 1;
867     processSlabSection(fieldInfo, tileID, data, myNumZSlabs);
868    
869    
870     // Can do the write here, or at the end of the epoch.
871     // Probably want to make it asynchronous (waiting for
872     // completion at the barrier at the start of the next epoch).
873     //if (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) {
874     // ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
875     // do_write(io_epoch,whichField,myFirstZSlab,myNumZSlabs);
876     //}
877     }
878    
879    
880     ////////////////////////////////////////////////////////////////
881     // Check if we can release any of the inUse buffers (i.e. the
882     // isends we used to redistribute those z-slabs have completed).
883     // We do this by detaching the inUse list, then examining each
884     // element in the list, putting each element either into the
885     // free list or back into the inUse list, as appropriate.
886     bufHdr = inUseTileBufs_ptr;
887     inUseTileBufs_ptr = NULL;
888     while (NULL != bufHdr) {
889     int count = 0;
890     int completions[intracommSize];
891     MPI_Status statuses[intracommSize];
892    
893     // Acquire the "next" bufHdr now, before we change anything.
894     bufHdr_t *nextBufHeader = bufHdr->next;
895    
896     // Check to see if the Isend requests have completed for this buf
897     MPI_Testsome(intracommSize, bufHdr->requests,
898     &count, completions, statuses);
899    
900     if (MPI_UNDEFINED == count) {
901     // A return of UNDEFINED means that none of the requests
902     // is still outstanding, i.e. they have all completed.
903     // Put the buf on the free list.
904     bufHdr->state = bufState_Free;
905     bufHdr->next = freeTileBufs_ptr;
906     freeTileBufs_ptr = bufHdr;
907     FPRINTF(stderr,"Rank %d freed a tile buffer\n", intracommRank);
908     } else {
909     // At least one request is still outstanding.
910     // Put the buf on the inUse list.
911     bufHdr->next = inUseTileBufs_ptr;
912     inUseTileBufs_ptr = bufHdr;
913     }
914    
915     // Countinue with the next buffer
916     bufHdr = nextBufHeader;
917     }
918    
919    
920     ////////////////////////////////////////////////////////////////
921     // Check if the epoch is complete
922     if ((numTilesRecvd >= fieldInfo->tileCount) &&
923     (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) &&
924     (NULL == inUseTileBufs_ptr))
925     {
926     ASSERT(numTilesRecvd == fieldInfo->tileCount);
927     ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
928    
929     //fprintf(stderr,"rank %d %d %d %d\n",intracommRank,numTilesRecvd,numSlabPiecesRecvd,myNumZSlabs);
930    
931     // Ok, wrap up the current i/o epoch
932     // Probably want to make this asynchronous (waiting for
933     // completion at the start of the next epoch).
934     do_write(epochID, fieldInfo, myFirstZSlab, myNumZSlabs, gcmIter);
935    
936     // The epoch is complete
937     return;
938     }
939     }
940    
941     }
942    
943    
944    
945    
946    
947    
948    
949     // Main routine for the i/o tasks. Callers DO NOT RETURN from
950     // this routine; they MPI_Finalize and exit.
951     void
952     ioRankMain(int totalNumTiles)
953     {
954     // This is one of a group of i/o processes that will be receiving tiles
955     // from the computational processes. This same process is also
956     // responsible for compositing slices of tiles together into one or
957     // more complete slabs, and then writing those slabs out to disk.
958     //
959     // When a tile is received, it is cut up into slices, and each slice is
960     // sent to the appropriate compositing task that is dealing with those
961     // "z" level(s) (possibly including ourselves). These are tagged with
962     // the tile-id. When we *receive* such a message (from ourselves, or
963     // from any other process in this group), we unpack the data (stripping
964     // the ghost cells), and put them into the slab we are assembling. Once
965     // a slab is fully assembled, it is written to disk.
966    
967     int i, size, count, numTileBufs;
968     MPI_Status status;
969     int currentEpochID = 0;
970     int maxTileCount = 0, max3DTileCount = 0, maxIntracommSize = 0;
971    
972     int numComputeRanks, epochStyleIndex;
973     MPI_Comm_remote_size(globalIntercomm, &numComputeRanks);
974    
975    
976     ////////////////////////////////////////////////////////////
977     //// get global tile info via bcast over the global intercom
978    
979    
980     // int ierr = MPI_Bcast(&NTILES,1,MPI_INT,0,globalIntercomm);
981    
982     int data[9];
983    
984     int ierr = MPI_Bcast(data,9,MPI_INT,0,globalIntercomm);
985    
986     NTILES = data[3];
987     TILE_X = data[5];
988     TILE_Y = data[6];
989     XGHOSTS = data[7];
990     YGHOSTS = data[8];
991    
992     tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
993     tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // also calculated in compute ranks, urghh
994    
995     int *xoff = malloc(NTILES*sizeof(int));
996     ASSERT(xoff);
997     int *yoff = malloc(NTILES*sizeof(int));
998     ASSERT(yoff);
999     int *xskip = malloc(NTILES*sizeof(int));
1000     ASSERT(xskip);
1001    
1002     offsetTable = malloc((NTILES+1)*sizeof(tile_layout_t));
1003     ASSERT(offsetTable);
1004    
1005     ierr = MPI_Bcast(xoff,NTILES,MPI_INT,0,globalIntercomm);
1006     ierr = MPI_Bcast(yoff,NTILES,MPI_INT,0,globalIntercomm);
1007     ierr = MPI_Bcast(xskip,NTILES,MPI_INT,0,globalIntercomm);
1008    
1009     for (i=0;i<NTILES;++i){
1010     offsetTable[i+1].off = NUM_X*(yoff[i]-1) + xoff[i] - 1;
1011     offsetTable[i+1].skip = xskip[i];
1012     }
1013    
1014     free(xoff);
1015     free(yoff);
1016     free(xskip);
1017    
1018     ///////////////////////////////////////////////////////////////
1019    
1020    
1021     // At this point, the multitude of various inter-communicators have
1022     // been created and stored in the various epoch "style" arrays.
1023     // The next step is to have the computational tasks "register" the
1024     // data tiles they will send. In this version of the code, we only
1025     // track and store *counts*, not the actual tileIDs.
1026     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1027     fieldInfoThisEpoch_t *p = NULL;
1028     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1029     int thisIntracommSize;
1030    
1031     // Find the field we were assigned for this epoch style
1032     int curFieldIndex = -1;
1033     while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1034     p = &(thisEpochStyle[curFieldIndex]);
1035     if (MPI_COMM_NULL != p->registrationIntercomm) break;
1036     }
1037     ASSERT(NULL != p);
1038     ASSERT('\0' != p->dataFieldID);
1039     MPI_Comm_size(p->ioRanksIntracomm, &size);
1040     if (size > maxIntracommSize) maxIntracommSize = size;
1041    
1042    
1043     // Receive a message from *each* computational rank telling us
1044     // a count of how many tiles that rank will send during epochs
1045     // of this style. Note that some of these counts may be zero
1046     for (i = 0; i < numComputeRanks; ++i) {
1047     MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG,
1048     p->registrationIntercomm, &status);
1049     p->tileCount += status.MPI_TAG;
1050     }
1051     if (p->tileCount > maxTileCount) maxTileCount = p->tileCount;
1052     if (p->zDepth > 1) {
1053     if (p->tileCount > max3DTileCount) max3DTileCount = p->tileCount;
1054     }
1055    
1056     // Sanity check
1057     MPI_Allreduce (&(p->tileCount), &count, 1,
1058     MPI_INT, MPI_SUM, p->ioRanksIntracomm);
1059     ASSERT(count == totalNumTiles);
1060     }
1061    
1062    
1063     // In some as-of-yet-undetermined fashion, we decide how many
1064     // buffers to allocate to hold tiles received from the computational
1065     // tasks. The number of such buffers is more-or-less arbitrary (the
1066     // code should be able to function with any number greater than zero).
1067     // More buffers means more parallelism, but uses more space.
1068     // Note that maxTileCount is an upper bound on the number of buffers
1069     // that we can usefully use. Note also that if we are assigned a 2D
1070     // field, we might be assigned to recieve a very large number of tiles
1071     // for that field in that epoch style.
1072    
1073     // Hack - for now, just pick a value for numTileBufs
1074     numTileBufs = (max3DTileCount > 0) ? max3DTileCount : maxTileCount;
1075     if (numTileBufs < 4) numTileBufs = 4;
1076     if (numTileBufs > 15) numTileBufs = 15;
1077     // if (numTileBufs > 25) numTileBufs = 25;
1078    
1079     allocateTileBufs(numTileBufs, maxIntracommSize);
1080     countBufs(numTileBufs);
1081    
1082    
1083     ////////////////////////////////////////////////////////////////////
1084     // Main loop.
1085     ////////////////////////////////////////////////////////////////////
1086    
1087     for (;;) {
1088     int cmd[4];
1089    
1090     // Make sure all the i/o ranks have completed processing the prior
1091     // cmd (i.e. the prior i/o epoch is complete).
1092     MPI_Barrier(ioIntracomm);
1093    
1094     if (0 == ioIntracommRank) {
1095     fprintf(stderr, "I/O ranks waiting for new epoch\n");
1096     MPI_Send(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, globalIntercomm);
1097    
1098     MPI_Recv(cmd, 4, MPI_INT, 0, 0, globalIntercomm, MPI_STATUS_IGNORE);
1099     fprintf(stderr, "I/O ranks begining new epoch: %d, gcmIter = %d\n", cmd[1],cmd[3]);
1100    
1101     // before we start a new epoch, have i/o rank 0:
1102     // determine output filenames for this epoch
1103     // clean up any extant files with same names
1104     // write .meta files
1105    
1106     if (cmd_exit != cmd[0]){
1107    
1108     fprintf(stderr,"new epoch: epoch %d, style %d, gcmIter %d\n", cmd[1],cmd[2],cmd[3]);
1109    
1110     int epochStyle = cmd[2];
1111     int gcmIter = cmd[3];
1112    
1113     fieldInfoThisEpoch_t *fieldInfo;
1114     fieldInfo = epochStyles[epochStyle];
1115     char s[1024];
1116     int res;
1117     FILE *fp;
1118    
1119     if (fieldInfo->pickup==0){ // for non-pickups, need to loop over individual fields
1120     char f;
1121     while (f = fieldInfo->dataFieldID){
1122     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1123     fprintf(stderr,"%s\n",s);
1124     res = unlink(s);
1125     if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1126    
1127     // skip writing meta files for non-pickup fields
1128     /*
1129     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1130     fp = fopen(s,"w+");
1131     fclose(fp);
1132     */
1133    
1134     ++fieldInfo;
1135    
1136     }
1137     }
1138    
1139     else { // single pickup or pickup_seaice file
1140    
1141     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1142     fprintf(stderr,"%s\n",s);
1143     res = unlink(s);
1144     if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1145    
1146     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1147     fp = fopen(s,"w+");
1148     write_pickup_meta(fp, gcmIter, fieldInfo->pickup);
1149     fclose(fp);
1150    
1151     }
1152     }
1153     }
1154     MPI_Bcast(cmd, 4, MPI_INT, 0, ioIntracomm);
1155    
1156     switch (cmd[0]) {
1157    
1158     case cmd_exit:
1159     // Don't bother trying to disconnect and free the
1160     // plethora of communicators; just exit.
1161     FPRINTF(stderr,"Received shutdown message\n");
1162     MPI_Finalize();
1163     exit(0);
1164     break;
1165    
1166     case cmd_newEpoch:
1167     if ((currentEpochID + 1) != cmd[1]) {
1168     fprintf(stderr, "ERROR: Missing i/o epoch? was %d, "
1169     "but is now %d ??\n", currentEpochID, cmd[1]);
1170     sleep(1); // Give the message a chance to propagate
1171     abort();
1172     }
1173     currentEpochID = cmd[1];
1174    
1175     memset(outBuf,0,outBufSize); // zero the outBuf, so dry tiles are well defined
1176    
1177     doNewEpoch(cmd[1], cmd[2], cmd[3]);
1178     break;
1179    
1180     default:
1181     fprintf(stderr, "Unexpected epoch command: %d %d %d %d\n",
1182     cmd[0], cmd[1], cmd[2], cmd[3]);
1183     sleep(1);
1184     abort();
1185     break;
1186     }
1187    
1188     }
1189    
1190     }
1191    
1192    
1193    
1194     ///////////////////////////////////////////////////////////////////////////////////
1195    
1196     int
1197     findField(const char c)
1198     {
1199     int i;
1200     for (i = 0; i < numAllFields; ++i) {
1201     if (c == fieldDepths[i].dataFieldID) return i;
1202     }
1203    
1204     // Give the error message a chance to propagate before exiting.
1205     fprintf(stderr, "ERROR: Field not found: '%c'\n", c);
1206     sleep(1);
1207     abort();
1208     }
1209    
1210    
1211    
1212     // Given a number of ranks and a set of fields we will want to output,
1213     // figure out how to distribute the ranks among the fields.
1214     void
1215     computeIORankAssigments(
1216     int numComputeRanks,
1217     int numIORanks,
1218     int numFields,
1219     fieldInfoThisEpoch_t *fields,
1220     int assignments[])
1221     {
1222     int i,j,k, sum, count;
1223    
1224     int numIONodes = numIORanks / numRanksPerNode;
1225     int numIORanksThisField[numFields];
1226     long int bytesPerIORankThisField[numFields];
1227     int expectedMessagesPerRankThisField[numFields];
1228     long int fieldSizes[numFields];
1229    
1230     struct ioNodeInfo_t {
1231     int expectedNumMessagesThisNode;
1232     int numCoresAssigned;
1233     int *assignedFieldThisCore;
1234     } ioNodeInfo[numIONodes];
1235    
1236     // Since numRanksPerNode might be dynamically determined, we have
1237     // to dynamically allocate the assignedFieldThisCore arrays.
1238     for (i = 0; i < numIONodes; ++i) {
1239     ioNodeInfo[i].assignedFieldThisCore = malloc(sizeof(int)*numRanksPerNode);
1240     ASSERT(NULL != ioNodeInfo[i].assignedFieldThisCore);
1241     }
1242    
1243     ASSERT((numIONodes*numRanksPerNode) == numIORanks);
1244    
1245    
1246     //////////////////////////////////////////////////////////////
1247     // Compute the size for each field in this epoch style
1248     for (i = 0; i < numFields; ++i) {
1249     ASSERT((1 == fields[i].zDepth) || (NUM_Z == fields[i].zDepth) || (MULTDIM == fields[i].zDepth));
1250     fieldSizes[i] = twoDFieldSizeInBytes * fields[i].zDepth;
1251     }
1252    
1253    
1254     /////////////////////////////////////////////////////////
1255     // Distribute the available i/o ranks among the fields,
1256     // proportionally based on field size.
1257    
1258     // First, assign one rank per field
1259     ASSERT(numIORanks >= numFields);
1260     for (i = 0; i < numFields; ++i) {
1261     numIORanksThisField[i] = 1;
1262     bytesPerIORankThisField[i] = fieldSizes[i];
1263     }
1264    
1265     // Now apportion any extra ranks
1266     for (; i < numIORanks; ++i) {
1267    
1268     // Find the field 'k' with the most bytesPerIORank
1269     k = 0;
1270     for (j = 1; j < numFields; ++j) {
1271     if (bytesPerIORankThisField[j] > bytesPerIORankThisField[k]) {
1272     k = j;
1273     }
1274     }
1275    
1276     // Assign an i/o rank to that field
1277     numIORanksThisField[k] += 1;
1278     bytesPerIORankThisField[k] = fieldSizes[k] / numIORanksThisField[k];
1279     }
1280    
1281     ////////////////////////////////////////////////////////////
1282     // At this point, all the i/o ranks should be apportioned
1283     // among the fields. Check we didn't screw up the count.
1284     for (sum = 0, i = 0; i < numFields; ++i) {
1285     sum += numIORanksThisField[i];
1286     }
1287     ASSERT(sum == numIORanks);
1288    
1289    
1290    
1291     //////////////////////////////////////////////////////////////////
1292     // The *number* of i/o ranks assigned to a field is based on the
1293     // field size. But the *placement* of those ranks is based on
1294     // the expected number of messages the process will receive.
1295     // [In particular, if we have several fields each with only one
1296     // assigned i/o rank, we do not want to place them all on the
1297     // same node if we don't have to.] This traffic-load balance
1298     // strategy is an approximation at best since the messages may
1299     // not all be the same size (e.g. 2D fields vs. 3D fields), so
1300     // "number of messages" is not the same thing as "traffic".
1301     // But it should suffice.
1302    
1303     // Init a couple of things
1304     for (i = 0; i < numFields; ++i) {
1305     expectedMessagesPerRankThisField[i] =
1306     numComputeRanks / numIORanksThisField[i];
1307     }
1308     for (i = 0; i < numIONodes; ++i) {
1309     ioNodeInfo[i].expectedNumMessagesThisNode = 0;
1310     ioNodeInfo[i].numCoresAssigned = 0;
1311     for (j = 0; j < numRanksPerNode; ++j) {
1312     ioNodeInfo[i].assignedFieldThisCore[j] = -1;
1313     }
1314     }
1315    
1316     ///////////////////////////////////////////////////////////////////
1317     // Select the i/o node with the smallest expectedNumMessages, and
1318     // assign it a rank from the field with the highest remaining
1319     // expectedMessagesPerRank. Repeat until everything is assigned.
1320     // (Yes, this could be a lot faster, but isn't worth the trouble.)
1321    
1322     for (count = 0; count < numIORanks; ++count) {
1323    
1324     // Make an initial choice of node
1325     for (i = 0; i < numIONodes; ++i) {
1326     if (ioNodeInfo[i].numCoresAssigned < numRanksPerNode) break;
1327     }
1328     j = i;
1329     ASSERT(j < numIONodes);
1330    
1331     // Search for a better choice
1332     for (++i; i < numIONodes; ++i) {
1333     if (ioNodeInfo[i].numCoresAssigned >= numRanksPerNode) continue;
1334     if (ioNodeInfo[i].expectedNumMessagesThisNode <
1335     ioNodeInfo[j].expectedNumMessagesThisNode)
1336     {
1337     j = i;
1338     }
1339     }
1340    
1341    
1342     // Make an initial choice of field
1343     for (i = 0; i < numFields; ++i) {
1344     if (numIORanksThisField[i] > 0) break;
1345     }
1346     k = i;
1347     ASSERT(k < numFields);
1348    
1349     // Search for a better choice
1350     for (++i; i < numFields; ++i) {
1351     if (numIORanksThisField[i] <= 0) continue;
1352     if (expectedMessagesPerRankThisField[i] >
1353     expectedMessagesPerRankThisField[k])
1354     {
1355     k = i;
1356     }
1357     }
1358    
1359     // Put one rank from the chosen field onto the chosen node
1360     ioNodeInfo[j].expectedNumMessagesThisNode += expectedMessagesPerRankThisField[k];
1361     ioNodeInfo[j].assignedFieldThisCore[ioNodeInfo[j].numCoresAssigned] = k;
1362     ioNodeInfo[j].numCoresAssigned += 1;
1363     numIORanksThisField[k] -= 1;
1364     }
1365    
1366     // Sanity check - all ranks were assigned to a core
1367     for (i = 1; i < numFields; ++i) {
1368     ASSERT(0 == numIORanksThisField[i]);
1369     }
1370     // Sanity check - all cores were assigned a rank
1371     for (i = 1; i < numIONodes; ++i) {
1372     ASSERT(numRanksPerNode == ioNodeInfo[i].numCoresAssigned);
1373     }
1374    
1375    
1376     /////////////////////////////////////
1377     // Return the computed assignments
1378     for (i = 0; i < numIONodes; ++i) {
1379     for (j = 0; j < numRanksPerNode; ++j) {
1380     assignments[i*numRanksPerNode + j] =
1381     ioNodeInfo[i].assignedFieldThisCore[j];
1382     }
1383     }
1384    
1385     // Clean up
1386     for (i = 0; i < numIONodes; ++i) {
1387     free(ioNodeInfo[i].assignedFieldThisCore);
1388     }
1389    
1390     }
1391    
1392     //////////////////////////////////////////////////////////////////////////////////
1393    
1394    
1395    
1396    
1397     int
1398     isIORank(int commRank, int totalNumNodes, int numIONodes)
1399     {
1400     // Figure out if this rank is on a node that will do i/o.
1401     // Note that the i/o nodes are distributed throughout the
1402     // task, not clustered together.
1403     int ioNodeStride = totalNumNodes / numIONodes;
1404     int thisRankNode = commRank / numRanksPerNode;
1405     int n = thisRankNode / ioNodeStride;
1406     return (((n * ioNodeStride) == thisRankNode) && (n < numIONodes)) ? 1 : 0;
1407     }
1408    
1409    
1410     // Find the number of *physical cores* on the current node
1411     // (ignore hyperthreading). This should work for pretty much
1412     // any Linux based system (and fail horribly for anything else).
1413     int
1414     getNumCores(void)
1415     {
1416     return 20; // until we figure out why this cratered
1417    
1418     char *magic = "cat /proc/cpuinfo | egrep \"core id|physical id\" | "
1419     "tr -d \"\\n\" | sed s/physical/\\\\nphysical/g | "
1420     "grep -v ^$ | sort -u | wc -l";
1421    
1422     FILE *fp = popen(magic,"r");
1423     ASSERT(fp);
1424    
1425     int rtnValue = -1;
1426    
1427     int res = fscanf(fp,"%d",&rtnValue);
1428    
1429     ASSERT(1==res);
1430    
1431     pclose(fp);
1432    
1433     ASSERT(rtnValue > 0);
1434     return rtnValue;
1435     }
1436    
1437    
1438    
1439     ////////////////////////////////////////////////////////////////////////
1440     ////////////////////////////////////////////////////////////////////////
1441     // Routines called by the mitGCM code
1442    
1443    
1444     ////////////////////////////////////////
1445     // Various "one time" initializations
1446     void
1447     f1(
1448     MPI_Comm parentComm,
1449     int numComputeRanks,
1450     int numTiles,
1451     MPI_Comm *rtnComputeComm)
1452     {
1453     MPI_Comm newIntracomm, newIntercomm, dupParentComm;
1454     int newIntracommRank, thisIsIORank;
1455     int parentSize, parentRank;
1456     int numIONodes, numIORanks;
1457     int mpiIsInitialized, *intPtr, tagUBexists;
1458     int i, j, nF, compRoot, fieldIndex, epochStyleIndex;
1459     int totalNumNodes, numComputeNodes, newIntracommSize;
1460    
1461     // Init globals
1462     totalNumTiles = numTiles;
1463     if (numRanksPerNode <= 0) {
1464     // Might also want to check for an env var (or something)
1465     numRanksPerNode = getNumCores();
1466     }
1467    
1468     // Fill in the zDepth field of the fieldInfoThisEpoch_t descriptors
1469     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1470     fieldInfoThisEpoch_t *f, *thisEpochStyle = epochStyles[epochStyleIndex];
1471    
1472     int curFieldIndex = -1;
1473     while ((f = &(thisEpochStyle[++curFieldIndex]))->dataFieldID != '\0') {
1474     i = findField(f->dataFieldID);
1475     if (-1 != f->zDepth) {
1476     if (f->zDepth != fieldDepths[i].numZ) {
1477     fprintf(stderr, "Inconsistent z-depth for field '%c': "
1478     "fieldDepths[%d] and epoch style %d\n",
1479     f->dataFieldID, i, epochStyleIndex);
1480     }
1481     }
1482     f->zDepth = fieldDepths[i].numZ;
1483     }
1484     }
1485    
1486    
1487     // Find the max MPI "tag" value
1488     MPI_Attr_get(MPI_COMM_WORLD, MPI_TAG_UB, &intPtr, &tagUBexists);
1489     ASSERT(tagUBexists);
1490     maxTagValue = *intPtr;
1491    
1492    
1493     // Info about the parent communicator
1494     MPI_Initialized(&mpiIsInitialized);
1495     ASSERT(mpiIsInitialized);
1496     MPI_Comm_size(parentComm, &parentSize);
1497     MPI_Comm_rank(parentComm, &parentRank);
1498    
1499    
1500     // Figure out how many nodes we can use for i/o.
1501     // To make things (e.g. memory calculations) simpler, we want
1502     // a node to have either all compute ranks, or all i/o ranks.
1503     // If numComputeRanks does not evenly divide numRanksPerNode, we have
1504     // to round up in favor of the compute side.
1505    
1506     totalNumNodes = divCeil(parentSize, numRanksPerNode);
1507     numComputeNodes = divCeil(numComputeRanks, numRanksPerNode);
1508     numIONodes = (parentSize - numComputeRanks) / numRanksPerNode;
1509     ASSERT(numIONodes > 0);
1510     ASSERT(numIONodes <= (totalNumNodes - numComputeNodes));
1511     numIORanks = numIONodes * numRanksPerNode;
1512    
1513    
1514     // It is surprisingly easy to launch the job with the wrong number
1515     // of ranks, particularly if the number of compute ranks is not a
1516     // multiple of numRanksPerNode (the tendency is to just launch one
1517     // rank per core for all available cores). So we introduce a third
1518     // option for a rank: besides being an i/o rank or a compute rank,
1519     // it might instead be an "excess" rank, in which case it just
1520     // calls MPI_Finalize and exits.
1521    
1522     typedef enum { isCompute, isIO, isExcess } rankType;
1523     rankType thisRanksType;
1524    
1525     if (isIORank(parentRank, totalNumNodes, numIONodes)) {
1526     thisRanksType = isIO;
1527     } else if (parentRank >= numComputeRanks + numIORanks) {
1528     thisRanksType = isExcess;
1529     } else {
1530     thisRanksType = isCompute;
1531     }
1532    
1533     // Split the parent communicator into parts
1534     MPI_Comm_split(parentComm, thisRanksType, parentRank, &newIntracomm);
1535     MPI_Comm_rank(newIntracomm, &newIntracommRank);
1536    
1537     // Excess ranks disappear
1538     // N.B. "parentSize" still includes these ranks.
1539     if (isExcess == thisRanksType) {
1540     MPI_Finalize();
1541     exit(0);
1542     }
1543    
1544     // Sanity check
1545     MPI_Comm_size(newIntracomm, &newIntracommSize);
1546     if (isIO == thisRanksType) {
1547     ASSERT(newIntracommSize == numIORanks);
1548     } else {
1549     ASSERT(newIntracommSize == numComputeRanks);
1550     }
1551    
1552    
1553     // Create a special intercomm from the i/o and compute parts
1554     if (isIO == thisRanksType) {
1555     // Set globals
1556     ioIntracomm = newIntracomm;
1557     MPI_Comm_rank(ioIntracomm, &ioIntracommRank);
1558    
1559     // Find the lowest computation rank
1560     for (i = 0; i < parentSize; ++i) {
1561     if (!isIORank(i, totalNumNodes, numIONodes)) break;
1562     }
1563     } else { // isCompute
1564     // Set globals
1565     computeIntracomm = newIntracomm;
1566     MPI_Comm_rank(computeIntracomm, &computeIntracommRank);
1567    
1568     // Find the lowest IO rank
1569     for (i = 0; i < parentSize; ++i) {
1570     if (isIORank(i, totalNumNodes, numIONodes)) break;
1571     }
1572     }
1573     MPI_Intercomm_create(newIntracomm, 0, parentComm, i, 0, &globalIntercomm);
1574    
1575    
1576    
1577     ///////////////////////////////////////////////////////////////////
1578     // For each different i/o epoch style, split the i/o ranks among
1579     // the fields, and create an inter-communicator for each split.
1580    
1581     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1582     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1583     int fieldAssignments[numIORanks];
1584     int rankAssignments[numComputeRanks + numIORanks];
1585    
1586     // Count the number of fields in this epoch style
1587     for (nF = 0; thisEpochStyle[nF].dataFieldID != '\0'; ++nF) ;
1588    
1589     // Decide how to apportion the i/o ranks among the fields
1590     for (i=0; i < numIORanks; ++i) fieldAssignments[i] = -1;
1591     computeIORankAssigments(numComputeRanks, numIORanks, nF,
1592     thisEpochStyle, fieldAssignments);
1593     // Sanity check
1594     for (i=0; i < numIORanks; ++i) {
1595     ASSERT((fieldAssignments[i] >= 0) && (fieldAssignments[i] < nF));
1596     }
1597    
1598     // Embed the i/o rank assignments into an array holding
1599     // the assignments for *all* the ranks (i/o and compute).
1600     // Rank assignment of "-1" means "compute".
1601     j = 0;
1602     for (i = 0; i < numComputeRanks + numIORanks; ++i) {
1603     rankAssignments[i] = isIORank(i, totalNumNodes, numIONodes) ?
1604     fieldAssignments[j++] : -1;
1605     }
1606     // Sanity check. Check the assignment for this rank.
1607     if (isIO == thisRanksType) {
1608     ASSERT(fieldAssignments[newIntracommRank] == rankAssignments[parentRank]);
1609     } else {
1610     ASSERT(-1 == rankAssignments[parentRank]);
1611     }
1612     ASSERT(j == numIORanks);
1613    
1614     if (0 == parentRank) {
1615     printf("\ncpu assignments, epoch style %d\n", epochStyleIndex);
1616     for (i = 0; i < numComputeNodes + numIONodes ; ++i) {
1617     if (rankAssignments[i*numRanksPerNode] >= 0) {
1618     // i/o node
1619     for (j = 0; j < numRanksPerNode; ++j) {
1620     printf(" %1d", rankAssignments[i*numRanksPerNode + j]);
1621     }
1622     } else {
1623     // compute node
1624     for (j = 0; j < numRanksPerNode; ++j) {
1625     if ((i*numRanksPerNode + j) >= (numComputeRanks + numIORanks)) break;
1626     ASSERT(-1 == rankAssignments[i*numRanksPerNode + j]);
1627     }
1628     printf(" #");
1629     for (; j < numRanksPerNode; ++j) { // "excess" ranks (if any)
1630     printf("X");
1631     }
1632     }
1633     printf(" ");
1634     }
1635     printf("\n\n");
1636     }
1637    
1638     // Find the lowest rank assigned to computation; use it as
1639     // the "root" for the upcoming intercomm creates.
1640     for (compRoot = 0; rankAssignments[compRoot] != -1; ++compRoot);
1641     // Sanity check
1642     if ((isCompute == thisRanksType) && (0 == newIntracommRank)) ASSERT(compRoot == parentRank);
1643    
1644    
1645     // If this is an I/O rank, split the newIntracomm (which, for
1646     // the i/o ranks, is a communicator among all the i/o ranks)
1647     // into the pieces as assigned.
1648    
1649     if (isIO == thisRanksType) {
1650     MPI_Comm fieldIntracomm;
1651     int myField = rankAssignments[parentRank];
1652     ASSERT(myField >= 0);
1653    
1654     MPI_Comm_split(newIntracomm, myField, parentRank, &fieldIntracomm);
1655     thisEpochStyle[myField].ioRanksIntracomm = fieldIntracomm;
1656    
1657     // Now create an inter-communicator between the computational
1658     // ranks, and each of the newly split off field communicators.
1659     for (i = 0; i < nF; ++i) {
1660    
1661     // Do one field at a time to avoid clashes on parentComm
1662     MPI_Barrier(newIntracomm);
1663    
1664     if (myField != i) continue;
1665    
1666     // Create the intercomm for this field for this epoch style
1667     MPI_Intercomm_create(fieldIntracomm, 0, parentComm,
1668     compRoot, i, &(thisEpochStyle[myField].dataIntercomm));
1669    
1670     // ... and dup a separate one for tile registrations
1671     MPI_Comm_dup(thisEpochStyle[myField].dataIntercomm,
1672     &(thisEpochStyle[myField].registrationIntercomm));
1673     }
1674     }
1675     else {
1676     // This is a computational rank; create the intercommunicators
1677     // to the various split off separate field communicators.
1678    
1679     for (fieldIndex = 0; fieldIndex < nF; ++fieldIndex) {
1680    
1681     // Find the remote "root" process for this field
1682     int fieldRoot = -1;
1683     while (rankAssignments[++fieldRoot] != fieldIndex);
1684    
1685     // Create the intercomm for this field for this epoch style
1686     MPI_Intercomm_create(newIntracomm, 0, parentComm, fieldRoot,
1687     fieldIndex, &(thisEpochStyle[fieldIndex].dataIntercomm));
1688    
1689     // ... and dup a separate one for tile registrations
1690     MPI_Comm_dup(thisEpochStyle[fieldIndex].dataIntercomm,
1691     &(thisEpochStyle[fieldIndex].registrationIntercomm));
1692     }
1693     }
1694    
1695     } // epoch style loop
1696    
1697    
1698     // I/O processes split off and start receiving data
1699     // NOTE: the I/O processes DO NOT RETURN from this call
1700     if (isIO == thisRanksType) ioRankMain(totalNumTiles);
1701    
1702    
1703     // The "compute" processes now return with their new intra-communicator.
1704     *rtnComputeComm = newIntracomm;
1705    
1706     // but first, call mpiio initialization routine!
1707     initSizesAndTypes();
1708    
1709     return;
1710     }
1711    
1712    
1713    
1714    
1715     // "Register" the tile-id(s) that this process will be sending
1716     void
1717     f2(int tileID)
1718     {
1719     int i, epochStyleIndex;
1720    
1721     static int count = 0;
1722    
1723     // This code assumes that the same tileID will apply to all the
1724     // fields. Note that we use the tileID as a tag, so we require it
1725     // be an int with a legal tag value, but we do *NOT* actually use
1726     // the tag *value* for anything (e.g. it is NOT used as an index
1727     // into an array). It is treated as an opaque handle that is
1728     // passed from the compute task(s) to the compositing routines.
1729     // Those two end-points likely assign meaning to the tileID (i.e.
1730     // use it to identify where the tile belongs within the domain),
1731     // but that is their business.
1732     //
1733     // A tileID of -1 signifies the end of registration.
1734    
1735     ASSERT(computeIntracommRank >= 0);
1736    
1737     if (tileID >= 0) {
1738     // In this version of the code, in addition to the tileID, we also
1739     // multiplex in the low order bit(s) of the epoch number as an
1740     // error check. So the tile value must be small enough to allow that.
1741     ASSERT(((tileID<<numCheckBits) + ((1<<numCheckBits)-1)) < maxTagValue);
1742    
1743     // In this version of the code, we do not send the actual tileID's
1744     // to the i/o processes during the registration procedure. We only
1745     // send a count of the number of tiles that we will send.
1746     ++count;
1747     return;
1748     }
1749    
1750    
1751     // We get here when we are called with a negative tileID, signifying
1752     // the end of registration. We now need to figure out and inform
1753     // *each* i/o process in *each* field just how many tiles we will be
1754     // sending them in *each* epoch style.
1755    
1756     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1757     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1758    
1759     int curFieldIndex = -1;
1760     while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1761     fieldInfoThisEpoch_t *thisField = &(thisEpochStyle[curFieldIndex]);
1762     int numRemoteRanks, *tileCounts, remainder;
1763     MPI_Comm_remote_size(thisField->dataIntercomm, &numRemoteRanks);
1764    
1765     tileCounts = alloca(numRemoteRanks * sizeof(int));
1766    
1767     memset(tileCounts,0,numRemoteRanks * sizeof(int));
1768    
1769     // Distribute the tiles among the i/o ranks.
1770     for (i = 0; i < numRemoteRanks; ++i) {
1771     tileCounts[i] = count / numRemoteRanks;
1772     }
1773     // Deal with any remainder
1774     remainder = count - ((count / numRemoteRanks) * numRemoteRanks);
1775     for (i = 0; i < remainder; ++i) {
1776     int target = (computeIntracommRank + i) % numRemoteRanks;
1777     tileCounts[target] += 1;
1778     }
1779    
1780     // Communicate these counts to the i/o processes for this
1781     // field. Note that we send a message to each process,
1782     // even if the count is zero.
1783     for (i = 0; i < numRemoteRanks; ++i) {
1784     MPI_Send(NULL, 0, MPI_BYTE, i, tileCounts[i],
1785     thisField->registrationIntercomm);
1786     }
1787    
1788     } // field loop
1789     } // epoch-style loop
1790    
1791     }
1792    
1793    
1794    
1795    
1796     int currentEpochID = 0;
1797     int currentEpochStyle = 0;
1798    
1799     void
1800     beginNewEpoch(int newEpochID, int gcmIter, int epochStyle)
1801     {
1802     fieldInfoThisEpoch_t *p;
1803    
1804     if (newEpochID != (currentEpochID + 1)) {
1805     fprintf(stderr, "ERROR: Missing i/o epoch? was %d, is now %d ??\n",
1806     currentEpochID, newEpochID);
1807     sleep(1); // Give the message a chance to propagate
1808     abort();
1809     }
1810    
1811     ////////////////////////////////////////////////////////////////////////
1812     // Need to be sure the i/o tasks are done processing the previous epoch
1813     // before any compute tasks start sending tiles from a new epoch.
1814    
1815     if (0 == computeIntracommRank) {
1816     int cmd[4] = { cmd_newEpoch, newEpochID, epochStyle, gcmIter };
1817    
1818     // Wait to get an ack that the i/o tasks are done with the prior epoch.
1819     MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1820     globalIntercomm, MPI_STATUS_IGNORE);
1821    
1822     // Tell the i/o ranks about the new epoch.
1823     // (Just tell rank 0; it will bcast to the others)
1824     MPI_Send(cmd, 4, MPI_INT, 0, 0, globalIntercomm);
1825     }
1826    
1827     // Compute ranks wait here until rank 0 gets the ack from the i/o ranks
1828     MPI_Barrier(computeIntracomm);
1829    
1830    
1831     currentEpochID = newEpochID;
1832     currentEpochStyle = epochStyle;
1833    
1834     // Reset the tileCount (used by f3)
1835     for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1836     p->tileCount = 0;
1837     }
1838     }
1839    
1840    
1841     void
1842     f3(char dataFieldID, int tileID, int epochID, void *data)
1843     {
1844     int whichField, receiver, tag;
1845    
1846     static char priorDataFieldID = '\0';
1847     static int priorEpoch = -1;
1848     static int remoteCommSize;
1849     static fieldInfoThisEpoch_t *p;
1850    
1851     int flag=0;
1852    
1853     // Check that this global has been set
1854     ASSERT(computeIntracommRank >= 0);
1855    
1856    
1857     // Figure out some info about this dataFieldID. It is
1858     // likely to be another tile from the same field as last time
1859     // we were called, in which case we can reuse the "static" values
1860     // retained from the prior call. If not, we need to look it up.
1861    
1862     if ((dataFieldID != priorDataFieldID) || (epochID != priorEpoch)) {
1863    
1864     // It's not the same; we need to look it up.
1865    
1866     for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1867     if (p->dataFieldID == dataFieldID) break;
1868     }
1869     // Make sure we found a valid field
1870    
1871     ASSERT(p->dataIntercomm != MPI_COMM_NULL);
1872    
1873     MPI_Comm_remote_size(p->dataIntercomm, &remoteCommSize);
1874    
1875     flag=1;
1876    
1877     }
1878    
1879     ASSERT(p->dataFieldID == dataFieldID);
1880    
1881     receiver = (computeIntracommRank + p->tileCount) % remoteCommSize;
1882    
1883     tag = (tileID << numCheckBits) | (epochID & bitMask);
1884    
1885     //fprintf(stderr,"%d %d\n",flag,tileID);
1886    
1887     /*
1888     if (tileID==189){
1889     int i,j;
1890     for (i=0;i<TILE_Y;++i){
1891     for (j=0;j<TILE_X;++j)
1892     fprintf(stderr,"%f ",((double*)data)[872+i*108+j]);
1893     fprintf(stderr,"\n");
1894     }
1895     }
1896    
1897     */
1898    
1899    
1900    
1901     FPRINTF(stderr,"Rank %d sends field '%c', tile %d, to i/o task %d with tag %d(%d)\n",
1902     computeIntracommRank, dataFieldID, tileID, receiver, tag, tag >> numCheckBits);
1903    
1904     MPI_Send(data, tileOneZLevelSizeInBytes * p->zDepth,
1905     MPI_BYTE, receiver, tag, p->dataIntercomm);
1906    
1907     p->tileCount += 1;
1908     priorDataFieldID = dataFieldID;
1909     priorEpoch = epochID;
1910     }
1911    
1912    
1913    
1914     void
1915     f4(int epochID)
1916     {
1917     int i;
1918     ASSERT(computeIntracommRank >= 0);
1919    
1920     if (0 == computeIntracommRank) {
1921     int cmd[3] = { cmd_exit, -1, -1 };
1922    
1923     // Recv the ack that the prior i/o epoch is complete
1924     MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1925     globalIntercomm, MPI_STATUS_IGNORE);
1926    
1927     // Tell the i/o ranks to exit
1928     // Just tell rank 0; it will bcast to the others
1929     MPI_Send(cmd, 3, MPI_INT, 0, 0, globalIntercomm);
1930     }
1931    
1932     }
1933    
1934    
1935    
1936     void myasyncio_set_global_sizes_(int *nx, int *ny, int *nz,
1937     int *nt, int *nb,
1938     int *tx, int *ty,
1939     int *ox, int *oy)
1940     {
1941    
1942    
1943     int data[] = {*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy};
1944    
1945     int items = sizeof(data)/sizeof(int);
1946    
1947    
1948     NTILES = *nt; // total number of tiles
1949    
1950    
1951     TILE_X = *tx;
1952     TILE_Y = *ty;
1953     XGHOSTS = *ox;
1954     YGHOSTS = *oy;
1955     tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
1956     tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // needed by compute ranks in f3
1957    
1958    
1959     int rank,ierr;
1960     MPI_Comm_rank(globalIntercomm,&rank);
1961    
1962    
1963     if (0==rank)
1964     printf("%d %d %d\n%d %d\n%d %d\n%d %d\n",*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy);
1965    
1966    
1967     /*
1968     if (0==rank)
1969     ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_ROOT,globalIntercomm);
1970     else
1971     ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1972     */
1973    
1974     if (0==rank)
1975     ierr=MPI_Bcast(data,items,MPI_INT,MPI_ROOT,globalIntercomm);
1976     else
1977     ierr=MPI_Bcast(data,items,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1978    
1979     }
1980    
1981     void asyncio_tile_arrays_(int *xoff, int *yoff, int *xskip)
1982     {
1983     int rank,ierr;
1984     MPI_Comm_rank(globalIntercomm,&rank);
1985    
1986     if (0==rank)
1987     ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1988     else
1989     ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1990    
1991     if (0==rank)
1992     ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1993     else
1994     ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1995    
1996     if (0==rank)
1997     ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1998     else
1999     ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
2000    
2001     }
2002    
2003    
2004    
2005    
2006    
2007     //////////////////////////////////////////////////////////////////////
2008     // Fortran interface
2009    
2010     void
2011     bron_f1(
2012     MPI_Fint *ptr_parentComm,
2013     int *ptr_numComputeRanks,
2014     int *ptr_totalNumTiles,
2015     MPI_Fint *rtnComputeComm)
2016     {
2017     // Convert the Fortran handle into a C handle
2018     MPI_Comm newComm, parentComm = MPI_Comm_f2c(*ptr_parentComm);
2019    
2020     f1(parentComm, *ptr_numComputeRanks, *ptr_totalNumTiles, &newComm);
2021    
2022     // Convert the C handle into a Fortran handle
2023     *rtnComputeComm = MPI_Comm_c2f(newComm);
2024     }
2025    
2026    
2027    
2028     void
2029     bron_f2(int *ptr_tileID)
2030     {
2031     f2(*ptr_tileID);
2032     }
2033    
2034    
2035     void
2036     beginnewepoch_(int *newEpochID, int *gcmIter, int *epochStyle)
2037     {
2038     beginNewEpoch(*newEpochID, *gcmIter, *epochStyle);
2039     }
2040    
2041    
2042     void
2043     bron_f3(char *ptr_dataFieldID, int *ptr_tileID, int *ptr_epochID, void *data)
2044     {
2045     f3(*ptr_dataFieldID, *ptr_tileID, *ptr_epochID, data);
2046     }
2047    
2048    
2049    
2050     void
2051     bron_f4(int *ptr_epochID)
2052     {
2053     f4(*ptr_epochID);
2054     }
2055    

  ViewVC Help
Powered by ViewVC 1.1.22