/[MITgcm]/MITgcm_contrib/llc_hires/llc_90/code-async-noseaice/recvTask.c
ViewVC logotype

Annotation of /MITgcm_contrib/llc_hires/llc_90/code-async-noseaice/recvTask.c

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.2 - (hide annotations) (download)
Fri Feb 7 15:39:16 2020 UTC (5 years, 5 months ago) by dimitri
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +632 -365 lines
File MIME type: text/plain
updating the no-seaice instructions with Bron's latest code

1 dimitri 1.1
2     // Code to do the m-on-n fan-in, recompositing, and output, of data
3 dimitri 1.2 // tiles for MITgcm.
4 dimitri 1.1 //
5     // There are aspects of this code that would be simpler in C++, but
6     // we deliberately wrote the code in ansi-C to make linking it with
7     // the Fortran main code easier (should be able to just include the
8     // .o on the link line).
9    
10    
11     #include "PACKAGES_CONFIG.h"
12 dimitri 1.2 #include "SIZE.h"
13 dimitri 1.1
14     #include <stdio.h>
15     #include <unistd.h>
16     #include <stdlib.h>
17     #include <string.h>
18     #include <sys/types.h>
19     #include <sys/stat.h>
20     #include <fcntl.h>
21 dimitri 1.2 #include <assert.h>
22 dimitri 1.1
23     #include <mpi.h>
24     #include <alloca.h>
25    
26    
27     #include <stdint.h>
28 dimitri 1.2 #include <limits.h>
29 dimitri 1.1 #include <errno.h>
30    
31     #define DEBUG 1
32    
33 dimitri 1.2 #if (DEBUG >= 3)
34 dimitri 1.1 #define FPRINTF fprintf
35     #else
36     #include <stdarg.h>
37     void FPRINTF(FILE *fp,...){return;}
38     #endif
39    
40    
41 dimitri 1.2 #if (DEBUG >= 2)
42 dimitri 1.1 // Define our own version of "assert", where we sleep rather than abort.
43     // This makes it easier to attach the debugger.
44     #define ASSERT(_expr) \
45     if (!(_expr)) { \
46     fprintf(stderr, "ASSERT failed for pid %d : `%s': %s: %d: %s\n", \
47     getpid(), #_expr, __FILE__, __LINE__, __func__); \
48     sleep(9999); \
49     }
50     #else
51 dimitri 1.2 // Use the standard version of "assert"
52 dimitri 1.1 #define ASSERT assert
53     #endif
54    
55    
56 dimitri 1.2 // The hostname that each rank is running on. Currently, only rank 0
57     // uses this. It is allocated and deallocated in routine "f1"
58     char (*allHostnames)[HOST_NAME_MAX+1] = NULL;
59    
60 dimitri 1.1
61     // If numRanksPerNode is set to be > 0, we just use that setting.
62     // If it is <= 0, we dynamically determine the number of cores on
63     // a node, and use that. (N.B.: we assume *all* nodes being used in
64 dimitri 1.2 // the run have the *same* number of MPI processes on each.)
65 dimitri 1.1 int numRanksPerNode = 0;
66    
67     // Just an error check; can be zero (if you're confident it's all correct).
68     #define numCheckBits 2
69     // This bitMask definition works for 2's complement
70     #define bitMask ((1 << numCheckBits) - 1)
71    
72    
73     ///////////////////////////////////////////////////////////////////////
74 dimitri 1.2 // Fundamental info about the data fields. Most (but not all) of
75     // this is copied or derived from the values in "SIZE.h"
76 dimitri 1.1
77     // double for now. Might also be float someday
78     #define datumType double
79     #define datumSize (sizeof(datumType))
80    
81    
82     // Info about the data fields. We assume that all the fields have the
83     // same X and Y characteristics, but may have either 1 or NUM_Z levels.
84 dimitri 1.2 // bcn: or MULTDIM levels (evidently used by "sea ice").
85    
86     // N.B. Please leave the seemingly unneeded "(long int)" casts in
87     // place. These individual values may not need them, but they get
88     // used in calculations where the extra length might be important.
89     #define NUM_X ((long int) sFacet)
90     #define NUM_Y (((long int) sFacet) * 13)
91     #define NUM_Z ((long int) Nr)
92     #define MULTDIM ((long int) 7)
93    
94     // Some values derived from the above constants
95 dimitri 1.1 #define twoDFieldSizeInBytes (NUM_X * NUM_Y * 1 * datumSize)
96     #define threeDFieldSizeInBytes (twoDFieldSizeInBytes * NUM_Z)
97     #define multDFieldSizeInBytes (twoDFieldSizeInBytes * MULTDIM)
98    
99     // Info about the data tiles. We assume that all the tiles are the same
100     // size (no odd-sized last piece), they all have the same X and Y
101     // characteristics (including ghosting), and are full depth in Z
102     // (either 1, or NUM_Z, or MULTDIM, as appropriate).
103 dimitri 1.2 #define TILE_X sNx
104     #define TILE_Y sNy
105     #define XGHOSTS OLx
106     #define YGHOSTS OLy
107     #define TOTAL_NUM_TILES ((sFacet / sNx) * (sFacet / sNy) * 13)
108     #define NUM_WET_TILES (nPx * nPy)
109    
110     // Size of one Z level of an individual tile. This is the "in memory"
111     // size, including ghosting.
112     #define tileOneZLevelItemCount (((long)(TILE_X + 2*XGHOSTS)) * (TILE_Y + 2*YGHOSTS))
113     #define tileOneZLevelSizeInBytes (tileOneZLevelItemCount * datumSize)
114    
115    
116 dimitri 1.1
117 dimitri 1.2 //////////////////////////////////////////////////////////////////////
118     // Define the depth (i.e. number of Z-levels) of the various fields.
119 dimitri 1.1
120     typedef struct dataFieldDepth {
121     char dataFieldID;
122     int numZ;
123     } dataFieldDepth_t;
124    
125     dataFieldDepth_t fieldDepths[] = {
126     //DM { 'A', MULTDIM }, // seaice, 7 == MULTDIM in SEAICE_SIZE.h
127    
128     { 'B', 1 },
129     { 'C', 1 },
130     { 'D', 1 },
131     { 'E', 1 },
132     { 'F', 1 },
133     { 'G', 1 },
134     { 'H', 1 },
135     { 'I', 1 },
136     { 'J', 1 },
137     { 'K', 1 },
138     { 'L', 1 },
139     { 'M', 1 },
140     { 'N', 1 },
141     { 'O', 1 },
142     { 'P', 1 },
143     { 'Q', 1 },
144     { 'R', 1 },
145    
146     { 'S', NUM_Z },
147     { 'T', NUM_Z },
148     { 'U', NUM_Z },
149     { 'V', NUM_Z },
150     { 'W', NUM_Z },
151     { 'X', NUM_Z },
152     { 'Y', NUM_Z },
153     };
154     #define numAllFields (sizeof(fieldDepths)/sizeof(dataFieldDepth_t))
155    
156    
157     ///////////////////////////////////////////////////////////////////////
158     // Info about the various kinds of i/o epochs
159     typedef struct epochFieldInfo {
160     char dataFieldID;
161     MPI_Comm registrationIntercomm;
162     MPI_Comm dataIntercomm;
163 dimitri 1.2 MPI_Comm ioRanksIntracomm; // i/o ranks for *this* field
164     long int tileCount;
165 dimitri 1.1 int zDepth; // duplicates the fieldDepth entry; filled in automatically
166 dimitri 1.2 int *chunkWriters; // Which rank writes the i'th chunk of z-levels
167     int nChunks; // length of chunkWriters array
168 dimitri 1.1 char filenameTemplate[128];
169     long int offset;
170     int pickup;
171     } fieldInfoThisEpoch_t;
172    
173 dimitri 1.2 // The normal i/o dump - style 0
174 dimitri 1.1 fieldInfoThisEpoch_t fieldsForEpochStyle_0[] = {
175 dimitri 1.2 { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "U.%010d.%s", 0, 0 },
176     { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "V.%010d.%s", 0, 0 },
177     { 'W', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "W.%010d.%s", 0, 0 },
178     { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "Salt.%010d.%s", 0, 0 },
179     { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "Theta.%010d.%s", 0,0 },
180     { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "Eta.%010d.%s", 0,0 },
181    
182     //DM { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIarea.%010d.%s", 0,0 },
183     //DM { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIheff.%010d.%s", 0,0 },
184     //DM { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIhsnow.%010d.%s", 0,0 },
185     //DM { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIuice.%010d.%s", 0,0 },
186     //DM { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIvice.%010d.%s", 0,0 },
187     //DM { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIhsalt.%010d.%s", 0,0 },
188    
189     //{ 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "EtaHnm1.%010d.%s", 0,0 },
190     { 'I', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceTAUX.%010d.%s", 0,0 },
191     { 'J', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceTAUY.%010d.%s", 0,0 },
192     { 'K', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "KPPhbl.%010d.%s", 0,0 },
193     { 'L', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceSflux.%010d.%s", 0,0 },
194    
195     { 'M', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceFWflx.%010d.%s", 0,0 },
196     { 'O', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceQnet.%010d.%s", 0,0 },
197     { 'P', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "PhiBot.%010d.%s", 0,0 },
198     { 'Q', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceQsw.%010d.%s", 0,0 },
199     //{ 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "dEtaHdt.%010d.%s", 0,0 },
200 dimitri 1.1
201 dimitri 1.2 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "", 0,0 },
202 dimitri 1.1 };
203    
204    
205 dimitri 1.2 // pickup file dump - style 1
206     // NOTE: if this changes, write_pickup_meta will also need to be changed
207 dimitri 1.1 fieldInfoThisEpoch_t fieldsForEpochStyle_1[] = {
208 dimitri 1.2 { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 1},
209     { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 1},
210     { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 2 + twoDFieldSizeInBytes * 0, 1},
211     { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 3 + twoDFieldSizeInBytes * 0, 1},
212     { 'X', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 4 + twoDFieldSizeInBytes * 0, 1},
213     { 'Y', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 5 + twoDFieldSizeInBytes * 0, 1},
214     { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 0, 1},
215     { 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 1, 1},
216     { 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 2, 1},
217     {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "", 0 ,1},
218 dimitri 1.1 };
219    
220    
221 dimitri 1.2 // seaice pickup dump - style 2
222     // NOTE: if this changes, write_pickup_meta will also need to be changed
223 dimitri 1.1 //DMfieldInfoThisEpoch_t fieldsForEpochStyle_2[] = {
224 dimitri 1.2 //DM { 'A', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 2},
225     //DM { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 2},
226     //DM { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 1, 2},
227     //DM { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 2, 2},
228     //DM { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 3, 2},
229     //DM { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 4, 2},
230     //DM { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 5, 2},
231     //DM {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "", 0 },
232 dimitri 1.1 //DM};
233    
234    
235     fieldInfoThisEpoch_t *epochStyles[] = {
236     fieldsForEpochStyle_0,
237     fieldsForEpochStyle_1,
238     //DM fieldsForEpochStyle_2,
239     };
240     int numEpochStyles = (sizeof(epochStyles) / sizeof(fieldInfoThisEpoch_t *));
241    
242    
243     typedef enum {
244     cmd_illegal,
245     cmd_newEpoch,
246     cmd_epochComplete,
247     cmd_exit,
248     } epochCmd_t;
249    
250    
251     ///////////////////////////////////////////////////////////////////////
252    
253     // Note that a rank will only have access to one of the Intracomms,
254     // but all ranks will define the Intercomm.
255     MPI_Comm ioIntracomm = MPI_COMM_NULL;
256     int ioIntracommRank = -1;
257     MPI_Comm computeIntracomm = MPI_COMM_NULL;
258     int computeIntracommRank = -1;
259     MPI_Comm globalIntercomm = MPI_COMM_NULL;
260    
261    
262     #define divCeil(_x,_y) (((_x) + ((_y) - 1)) / (_y))
263     #define roundUp(_x,_y) (divCeil((_x),(_y)) * (_y))
264    
265    
266     typedef enum {
267     bufState_illegal,
268     bufState_Free,
269     bufState_InUse,
270     } bufState_t;
271    
272    
273     typedef struct buf_header{
274     struct buf_header *next;
275     bufState_t state;
276     int requestsArraySize;
277     MPI_Request *requests;
278 dimitri 1.2 char *payload;
279     } bufHdr_t;
280 dimitri 1.1
281    
282     bufHdr_t *freeTileBufs_ptr = NULL;
283     bufHdr_t *inUseTileBufs_ptr = NULL;
284    
285     int maxTagValue = -1;
286 dimitri 1.2
287 dimitri 1.1
288     // routine to convert double to float during memcpy
289     // need to get byteswapping in here as well
290     memcpy_r8_2_r4 (float *f, double *d, long long *n)
291     {
292     long long i, rem;
293     rem = *n%16LL;
294     for (i = 0; i < rem; i++) {
295     f [i] = d [i];
296     }
297     for (i = rem; i < *n; i += 16) {
298     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 10" : : "m" (d [i + 256 + 0]) );
299     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 11" : : "m" (f [i + 256 + 0]) );
300     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm0 # memcpy_r8_2_r4.c 12" : : "m" (d [i + 0]) : "%xmm0");
301     __asm__ __volatile__ ("movss %%xmm0, %0 # memcpy_r8_2_r4.c 13" : "=m" (f [i + 0]) : : "memory");
302     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm1 # memcpy_r8_2_r4.c 14" : : "m" (d [i + 1]) : "%xmm1");
303     __asm__ __volatile__ ("movss %%xmm1, %0 # memcpy_r8_2_r4.c 15" : "=m" (f [i + 1]) : : "memory");
304     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm2 # memcpy_r8_2_r4.c 16" : : "m" (d [i + 2]) : "%xmm2");
305     __asm__ __volatile__ ("movss %%xmm2, %0 # memcpy_r8_2_r4.c 17" : "=m" (f [i + 2]) : : "memory");
306     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm3 # memcpy_r8_2_r4.c 18" : : "m" (d [i + 3]) : "%xmm3");
307     __asm__ __volatile__ ("movss %%xmm3, %0 # memcpy_r8_2_r4.c 19" : "=m" (f [i + 3]) : : "memory");
308     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm4 # memcpy_r8_2_r4.c 20" : : "m" (d [i + 4]) : "%xmm4");
309     __asm__ __volatile__ ("movss %%xmm4, %0 # memcpy_r8_2_r4.c 21" : "=m" (f [i + 4]) : : "memory");
310     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm5 # memcpy_r8_2_r4.c 22" : : "m" (d [i + 5]) : "%xmm5");
311     __asm__ __volatile__ ("movss %%xmm5, %0 # memcpy_r8_2_r4.c 23" : "=m" (f [i + 5]) : : "memory");
312     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm6 # memcpy_r8_2_r4.c 24" : : "m" (d [i + 6]) : "%xmm6");
313     __asm__ __volatile__ ("movss %%xmm6, %0 # memcpy_r8_2_r4.c 25" : "=m" (f [i + 6]) : : "memory");
314     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm7 # memcpy_r8_2_r4.c 26" : : "m" (d [i + 7]) : "%xmm7");
315     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 27" : : "m" (d [i + 256 + 8 + 0]) );
316     __asm__ __volatile__ ("movss %%xmm7, %0 # memcpy_r8_2_r4.c 28" : "=m" (f [i + 7]) : : "memory");
317     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm8 # memcpy_r8_2_r4.c 29" : : "m" (d [i + 8]) : "%xmm8");
318     __asm__ __volatile__ ("movss %%xmm8, %0 # memcpy_r8_2_r4.c 30" : "=m" (f [i + 8]) : : "memory");
319     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm9 # memcpy_r8_2_r4.c 31" : : "m" (d [i + 9]) : "%xmm9");
320     __asm__ __volatile__ ("movss %%xmm9, %0 # memcpy_r8_2_r4.c 32" : "=m" (f [i + 9]) : : "memory");
321     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm10 # memcpy_r8_2_r4.c 33" : : "m" (d [i + 10]) : "%xmm10");
322     __asm__ __volatile__ ("movss %%xmm10, %0 # memcpy_r8_2_r4.c 34" : "=m" (f [i + 10]) : : "memory");
323     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm11 # memcpy_r8_2_r4.c 35" : : "m" (d [i + 11]) : "%xmm11");
324     __asm__ __volatile__ ("movss %%xmm11, %0 # memcpy_r8_2_r4.c 36" : "=m" (f [i + 11]) : : "memory");
325     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm12 # memcpy_r8_2_r4.c 37" : : "m" (d [i + 12]) : "%xmm12");
326     __asm__ __volatile__ ("movss %%xmm12, %0 # memcpy_r8_2_r4.c 38" : "=m" (f [i + 12]) : : "memory");
327     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm13 # memcpy_r8_2_r4.c 39" : : "m" (d [i + 13]) : "%xmm13");
328     __asm__ __volatile__ ("movss %%xmm13, %0 # memcpy_r8_2_r4.c 40" : "=m" (f [i + 13]) : : "memory");
329     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm14 # memcpy_r8_2_r4.c 41" : : "m" (d [i + 14]) : "%xmm14");
330     __asm__ __volatile__ ("movss %%xmm14, %0 # memcpy_r8_2_r4.c 42" : "=m" (f [i + 14]) : : "memory");
331     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm15 # memcpy_r8_2_r4.c 43" : : "m" (d [i + 15]) : "%xmm15");
332     __asm__ __volatile__ ("movss %%xmm15, %0 # memcpy_r8_2_r4.c 44" : "=m" (f [i + 15]) : : "memory");
333     }
334     }
335    
336 dimitri 1.2
337 dimitri 1.1 // Debug routine
338     countBufs(int nbufs)
339     {
340     int nInUse, nFree;
341     bufHdr_t *bufPtr;
342     static int target = -1;
343    
344     if (-1 == target) {
345     ASSERT(-1 != nbufs);
346     target = nbufs;
347     }
348    
349     bufPtr = freeTileBufs_ptr;
350     for (nFree = 0; bufPtr != NULL; bufPtr = bufPtr->next) nFree += 1;
351    
352     bufPtr = inUseTileBufs_ptr;
353     for (nInUse = 0; bufPtr != NULL; bufPtr = bufPtr->next) nInUse += 1;
354    
355     if (nInUse + nFree != target) {
356     fprintf(stderr, "Rank %d: bad number of buffs: free %d, inUse %d, should be %d\n",
357     ioIntracommRank, nFree, nInUse, target);
358     }
359 dimitri 1.2 ASSERT ((nInUse + nFree) == target);
360 dimitri 1.1 }
361    
362 dimitri 1.2
363     long int readn(int fd, void *p, long int nbytes)
364 dimitri 1.1 {
365 dimitri 1.2
366 dimitri 1.1 char *ptr = (char*)(p);
367    
368 dimitri 1.2 long int nleft, nread;
369 dimitri 1.1
370     nleft = nbytes;
371     while (nleft > 0){
372     nread = read(fd, ptr, nleft);
373     if (nread < 0)
374     return(nread); // error
375     else if (nread == 0)
376     break; // EOF
377    
378     nleft -= nread;
379     ptr += nread;
380     }
381     return (nbytes - nleft);
382     }
383    
384    
385     ssize_t writen(int fd, void *p, size_t nbytes)
386     {
387     char *ptr = (char*)(p);
388    
389     size_t nleft;
390     ssize_t nwritten;
391    
392     nleft = nbytes;
393     while (nleft > 0){
394     nwritten = write(fd, ptr, nleft);
395     if (nwritten <= 0){
396     if (errno==EINTR) continue; // POSIX, not SVr4
397     return(nwritten); // non-EINTR error
398     }
399    
400     nleft -= nwritten;
401     ptr += nwritten;
402     }
403     return(nbytes - nleft);
404     }
405    
406    
407     void write_pickup_meta(FILE *fp, int gcmIter, int pickup)
408     {
409     int i;
410     int ndims = 2;
411     int nrecords,nfields;
412     char**f;
413     char*fld1[] = {"Uvel","Vvel","Theta","Salt","GuNm1","GvNm1","EtaN","dEtaHdt","EtaH"};
414     char*fld2[] = {"siTICES","siAREA","siHEFF","siHSNOW","siHSALT","siUICE","siVICE"};
415     // for now, just list the fields here. When the whole field specification apparatus
416     // is cleaned up, pull the names out of the epochstyle definition or whatever
417    
418     ASSERT(1==pickup||2==pickup);
419    
420     if (1==pickup){
421     nrecords = 6*NUM_Z+3;
422     nfields = sizeof(fld1)/sizeof(char*);
423     f = fld1;
424     }
425     else if (2==pickup){
426     nrecords = MULTDIM+6;
427     nfields = sizeof(fld2)/sizeof(char*);
428     f = fld2;
429     }
430    
431     fprintf(fp," nDims = [ %3d ];\n",ndims);
432     fprintf(fp," dimList = [\n");
433 dimitri 1.2 fprintf(fp," %10lu,%10d,%10lu,\n",NUM_X,1,NUM_X);
434 dimitri 1.1 fprintf(fp," %10ld,%10d,%10ld\n",NUM_Y,1,NUM_Y);
435     fprintf(fp," ];\n");
436     fprintf(fp," dataprec = [ 'float64' ];\n");
437     fprintf(fp," nrecords = [ %5d ];\n",nrecords);
438     fprintf(fp," timeStepNumber = [ %10d ];\n",gcmIter);
439     fprintf(fp," timeInterval = [ %19.12E ];\n",0.0); // what should this be?
440     fprintf(fp," nFlds = [ %4d ];\n",nfields);
441     fprintf(fp," fldList = {\n");
442     for (i=0;i<nfields;++i)
443     fprintf(fp," '%-8s'",f[i]);
444     fprintf(fp,"\n };\n");
445     }
446    
447    
448    
449 dimitri 1.2 double *outBuf=NULL;//was [NUM_X*NUM_Y*NUM_Z], but only needs to be myNumZSlabs
450 dimitri 1.1 size_t outBufSize=0;
451    
452    
453     void
454 dimitri 1.2 do_write(int io_epoch, fieldInfoThisEpoch_t *whichField, int myFirstZ, int myNumZ, int gcmIter)
455 dimitri 1.1 {
456 dimitri 1.2 if (0==myNumZ) return;
457 dimitri 1.1
458     int pickup = whichField->pickup;
459    
460     ///////////////////////////////
461     // swap here, if necessary
462     // int i,j;
463    
464 dimitri 1.2 //i = NUM_X*NUM_Y*myNumZ;
465 dimitri 1.1 //mds_byteswapr8_(&i,outBuf);
466    
467     // mds_byteswapr8 expects an integer count, which is gonna overflow someday
468     // can't redefine to long without affecting a bunch of other calls
469     // so do a slab at a time here, to delay the inevitable
470     // i = NUM_X*NUM_Y;
471 dimitri 1.2 //for (j=0;j<myNumZ;++j)
472 dimitri 1.1 // mds_byteswapr8_(&i,&outBuf[i*j]);
473    
474     // gnu builtin evidently honored by intel compilers
475    
476     if (pickup) {
477     uint64_t *alias = (uint64_t*)outBuf;
478     size_t i;
479 dimitri 1.2 for (i=0;i<NUM_X*NUM_Y*myNumZ;++i)
480 dimitri 1.1 alias[i] = __builtin_bswap64(alias[i]);
481     }
482     else {
483     uint32_t *alias = (uint32_t*)outBuf;
484     size_t i;
485 dimitri 1.2 for (i=0;i<NUM_X*NUM_Y*myNumZ;++i)
486 dimitri 1.1 alias[i] = __builtin_bswap32(alias[i]);
487     }
488    
489     // end of swappiness
490     //////////////////////////////////
491    
492     char s[1024];
493     //sprintf(s,"henze_%d_%d_%c.dat",io_epoch,gcmIter,whichField->dataFieldID);
494    
495     sprintf(s,whichField->filenameTemplate,gcmIter,"data");
496    
497     int fd = open(s,O_CREAT|O_WRONLY,S_IRWXU|S_IRGRP);
498     ASSERT(fd!=-1);
499    
500     size_t nbytes;
501    
502     if (pickup) {
503     lseek(fd,whichField->offset,SEEK_SET);
504 dimitri 1.2 lseek(fd,myFirstZ*NUM_X*NUM_Y*datumSize,SEEK_CUR);
505     nbytes = NUM_X*NUM_Y*myNumZ*datumSize;
506 dimitri 1.1 }
507     else {
508 dimitri 1.2 lseek(fd,myFirstZ*NUM_X*NUM_Y*sizeof(float),SEEK_CUR);
509     nbytes = NUM_X*NUM_Y*myNumZ*sizeof(float);
510 dimitri 1.1 }
511    
512     ssize_t bwrit = writen(fd,outBuf,nbytes);
513    
514     if (-1==bwrit) perror("Henze : file write problem : ");
515    
516 dimitri 1.2 FPRINTF(stderr,"Wrote %d of %d bytes (%d -> %d) \n",bwrit,nbytes,myFirstZ,myNumZ);
517 dimitri 1.1
518     // ASSERT(nbytes == bwrit);
519    
520     if (nbytes!=bwrit)
521     fprintf(stderr,"WROTE %ld /%ld\n",bwrit,nbytes);
522    
523     close(fd);
524    
525    
526     return;
527     }
528    
529    
530    
531     typedef struct {
532 dimitri 1.2 long int off;
533     long int skip;
534 dimitri 1.1 } tile_layout_t;
535    
536     tile_layout_t *offsetTable;
537    
538     void
539     processSlabSection(
540     fieldInfoThisEpoch_t *whichField,
541     int tileID,
542     void *data,
543     int myNumZSlabs)
544     {
545     int z;
546    
547     int pickup = whichField->pickup;
548    
549 dimitri 1.2 ASSERT ((tileID > 0) && (tileID <= TOTAL_NUM_TILES));
550 dimitri 1.1
551     if (myNumZSlabs * twoDFieldSizeInBytes > outBufSize){
552    
553     free(outBuf);
554    
555     outBufSize = myNumZSlabs * twoDFieldSizeInBytes;
556    
557     outBuf = malloc(outBufSize);
558     ASSERT(outBuf);
559    
560     memset(outBuf,0,outBufSize);
561     }
562    
563     for (z=0;z<myNumZSlabs;++z){
564    
565 dimitri 1.2 off_t zoffset = z*TILE_X*TILE_Y*TOTAL_NUM_TILES;
566 dimitri 1.1 off_t hoffset = offsetTable[tileID].off;
567     off_t skipdst = offsetTable[tileID].skip;
568    
569     //double *dst = outBuf + zoffset + hoffset;
570     void *dst;
571     if (pickup)
572     dst = outBuf + zoffset + hoffset;
573     else
574     dst = (float*)outBuf + zoffset + hoffset;
575    
576     off_t zoff = z*(TILE_X+2*XGHOSTS)*(TILE_Y+2*YGHOSTS);
577     off_t hoff = YGHOSTS*(TILE_X+2*XGHOSTS) + YGHOSTS;
578     double *src = (double*)data + zoff + hoff;
579    
580     off_t skipsrc = TILE_X+2*XGHOSTS;
581    
582     long long n = TILE_X;
583    
584     int y;
585     if (pickup)
586     for (y=0;y<TILE_Y;++y)
587     memcpy((double*)dst + y * skipdst, src + y * skipsrc, TILE_X*datumSize);
588     else
589     for (y=0;y<TILE_Y;++y)
590     memcpy_r8_2_r4((float*)dst + y * skipdst, src + y * skipsrc, &n);
591     }
592    
593     return;
594     }
595    
596    
597    
598     // Allocate some buffers to receive tile-data from the compute ranks
599     void
600     allocateTileBufs(int numTileBufs, int maxIntracommSize)
601     {
602     int i, j;
603     for (i = 0; i < numTileBufs; ++i) {
604    
605     bufHdr_t *newBuf = malloc(sizeof(bufHdr_t));
606     ASSERT(NULL != newBuf);
607    
608     newBuf->payload = malloc(tileOneZLevelSizeInBytes * NUM_Z);
609     ASSERT(NULL != newBuf->payload);
610    
611     newBuf->requests = malloc(maxIntracommSize * sizeof(MPI_Request));
612     ASSERT(NULL != newBuf->requests);
613    
614     // Init some values
615     newBuf->requestsArraySize = maxIntracommSize;
616     for (j = 0; j < maxIntracommSize; ++j) {
617     newBuf->requests[j] = MPI_REQUEST_NULL;
618     }
619    
620     // Put the buf into the free list
621     newBuf->state = bufState_Free;
622     newBuf->next = freeTileBufs_ptr;
623     freeTileBufs_ptr = newBuf;
624     }
625     }
626    
627    
628    
629     bufHdr_t *
630     getFreeBuf()
631     {
632     int j;
633     bufHdr_t *rtnValue = freeTileBufs_ptr;
634    
635     if (NULL != rtnValue) {
636     ASSERT(bufState_Free == rtnValue->state);
637     freeTileBufs_ptr = rtnValue->next;
638     rtnValue->next = NULL;
639    
640     // Paranoia. This should already be the case
641     for (j = 0; j < rtnValue->requestsArraySize; ++j) {
642     rtnValue->requests[j] = MPI_REQUEST_NULL;
643     }
644     }
645     return rtnValue;
646     }
647    
648    
649     /////////////////////////////////////////////////////////////////
650    
651    
652     bufHdr_t *
653     tryToReceiveDataTile(
654     MPI_Comm dataIntercomm,
655     int epochID,
656     size_t expectedMsgSize,
657     int *tileID)
658     {
659     bufHdr_t *bufHdr;
660     int pending, i, count;
661     MPI_Status mpiStatus;
662    
663     MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, dataIntercomm,
664     &pending, &mpiStatus);
665    
666     // If no data are pending, or if we can't get a buffer to
667     // put the pending data into, return NULL
668     if (!pending) return NULL;
669     if (((bufHdr = getFreeBuf()) == NULL)) {
670     FPRINTF(stderr,"tile %d(%d) pending, but no buf to recv it\n",
671     mpiStatus.MPI_TAG, mpiStatus.MPI_TAG >> numCheckBits);
672     return NULL;
673     }
674    
675     // Do sanity checks on the pending message
676     MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
677     ASSERT(count == expectedMsgSize);
678     ASSERT((mpiStatus.MPI_TAG & bitMask) == (epochID & bitMask));
679    
680     // Recieve the data we saw in the iprobe
681     MPI_Recv(bufHdr->payload, expectedMsgSize, MPI_BYTE,
682     mpiStatus.MPI_SOURCE, mpiStatus.MPI_TAG,
683     dataIntercomm, &mpiStatus);
684     bufHdr->state = bufState_InUse;
685    
686     // Return values
687     *tileID = mpiStatus.MPI_TAG >> numCheckBits;
688    
689    
690     FPRINTF(stderr, "recv tile %d(%d) from compute rank %d\n",
691     mpiStatus.MPI_TAG, *tileID, mpiStatus.MPI_SOURCE);
692    
693     return bufHdr;
694     }
695    
696    
697    
698     int
699     tryToReceiveZSlab(
700     void *buf,
701 dimitri 1.2 long int expectedMsgSize,
702 dimitri 1.1 MPI_Comm intracomm)
703     {
704     MPI_Status mpiStatus;
705     int pending, count;
706    
707     MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, intracomm, &pending, &mpiStatus);
708     if (!pending) return -1;
709    
710     MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
711     ASSERT(count == expectedMsgSize);
712    
713     MPI_Recv(buf, count, MPI_BYTE, mpiStatus.MPI_SOURCE,
714     mpiStatus.MPI_TAG, intracomm, &mpiStatus);
715    
716     FPRINTF(stderr, "recv slab %d from rank %d\n",
717     mpiStatus.MPI_TAG, mpiStatus.MPI_SOURCE);
718    
719     // return the tileID
720     return mpiStatus.MPI_TAG;
721     }
722    
723    
724     void
725     redistributeZSlabs(
726     bufHdr_t *bufHdr,
727     int tileID,
728     int zSlabsPer,
729 dimitri 1.2 fieldInfoThisEpoch_t *fieldInfo)
730 dimitri 1.1 {
731 dimitri 1.2 int thisFieldNumZ = fieldInfo->zDepth;
732     long int tileSizeInBytes = tileOneZLevelSizeInBytes * thisFieldNumZ;
733     long int fullPieceSize = zSlabsPer * tileOneZLevelSizeInBytes;
734     long int offset = 0;
735    
736     MPI_Comm intracomm = fieldInfo->ioRanksIntracomm;
737    
738     // Note that the MPI interface definition requires that this arg
739     // (i.e. "pieceSize") be of type "int". So check to be sure we
740     // haven't overflowed the size.
741     int pieceSize = (int)fullPieceSize;
742     if (pieceSize != fullPieceSize) {
743     fprintf(stderr, "ERROR: pieceSize:%d, fullPieceSize:%ld\n",
744     pieceSize, fullPieceSize);
745     fprintf(stderr, "ERROR: tileOneZLevelSizeInBytes:%ld, tileSizeInBytes:%ld\n",
746     tileOneZLevelSizeInBytes, tileSizeInBytes);
747     fprintf(stderr, "ERROR: thisFieldNumZ:%d, zSlabsPer:%d\n",
748     thisFieldNumZ , zSlabsPer);
749     }
750     ASSERT (pieceSize == fullPieceSize);
751     ASSERT (pieceSize > 0);
752    
753    
754     // Send each piece (aka chunk) to the rank that is responsible
755     // for writing it.
756     int curPiece = 0;
757 dimitri 1.1 while ((offset + pieceSize) <= tileSizeInBytes) {
758 dimitri 1.2 int recvRank = fieldInfo->chunkWriters[curPiece];
759 dimitri 1.1 MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
760     tileID, intracomm, &(bufHdr->requests[recvRank]));
761     ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
762    
763     offset += pieceSize;
764 dimitri 1.2 curPiece += 1;
765 dimitri 1.1 }
766    
767 dimitri 1.2 // There might be one last odd-sized piece (N.B.: odd-sized in Z;
768     // the X and Y sizes of a tile are always the same).
769 dimitri 1.1 if (offset < tileSizeInBytes) {
770 dimitri 1.2 ASSERT (pieceSize >= tileSizeInBytes - offset);
771 dimitri 1.1 pieceSize = tileSizeInBytes - offset;
772     ASSERT(pieceSize % tileOneZLevelSizeInBytes == 0);
773    
774 dimitri 1.2 int recvRank = fieldInfo->chunkWriters[curPiece];
775 dimitri 1.1 MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
776     tileID, intracomm, &(bufHdr->requests[recvRank]));
777     ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
778 dimitri 1.2 curPiece += 1;
779 dimitri 1.1 }
780    
781 dimitri 1.2 ASSERT (curPiece == fieldInfo->nChunks);
782 dimitri 1.1 }
783    
784    
785    
786     /////////////////////////////////////////////////////////////////
787     // This is called by the i/o ranks
788     void
789     doNewEpoch(int epochID, int epochStyleIndex, int gcmIter)
790     {
791     // In an i/o epoch, the i/o ranks are partitioned into groups,
792     // each group dealing with one field. The ranks within a group:
793     // (1) receive data tiles from the compute ranks
794     // (2) slice the received data tiles into slabs in the 'z'
795     // dimension, and redistribute the tile-slabs among the group.
796     // (3) receive redistributed tile-slabs, and reconstruct a complete
797     // field-slab for the whole field.
798     // (4) Write the completed field-slab to disk.
799    
800     fieldInfoThisEpoch_t *fieldInfo;
801     int intracommRank, intracommSize;
802    
803     int zSlabsPer;
804     int myNumZSlabs, myFirstZSlab, myLastZSlab, myNumSlabPiecesToRecv;
805    
806 dimitri 1.2 int ii, numTilesRecvd, numSlabPiecesRecvd;
807 dimitri 1.1
808    
809     // Find the descriptor for my assigned field for this epoch style.
810     // It is the one whose dataIntercomm is not null
811     fieldInfo = epochStyles[epochStyleIndex];
812     while (MPI_COMM_NULL == fieldInfo->dataIntercomm) ++fieldInfo;
813     ASSERT('\0' != fieldInfo->dataFieldID);
814    
815    
816     MPI_Comm_rank(fieldInfo->ioRanksIntracomm, &intracommRank);
817     MPI_Comm_size(fieldInfo->ioRanksIntracomm, &intracommSize);
818    
819 dimitri 1.2 zSlabsPer = divCeil(fieldInfo->zDepth, intracommSize);
820 dimitri 1.1
821 dimitri 1.2 // Figure out if this rank writes z-slabs, and if so, which ones
822     myNumZSlabs = 0;
823     myFirstZSlab = 0;
824     myLastZSlab = -1;
825     for (ii = 0; ii < fieldInfo->nChunks; ++ii) {
826     if (fieldInfo->chunkWriters[ii] == intracommRank) {
827     myFirstZSlab = ii * zSlabsPer;
828     // The last chunk might be odd sized
829     myNumZSlabs = ((ii + 1) < fieldInfo->nChunks) ?
830     zSlabsPer : fieldInfo->zDepth - myFirstZSlab;
831     myLastZSlab = myFirstZSlab + myNumZSlabs - 1;
832    
833     break;
834     }
835     }
836     if (myNumZSlabs > 0) {
837     ASSERT ((myFirstZSlab >= 0) && (myFirstZSlab < fieldInfo->zDepth));
838     ASSERT ((myLastZSlab >= 0) && (myLastZSlab < fieldInfo->zDepth));
839     ASSERT (myLastZSlab >= myFirstZSlab);
840 dimitri 1.1 }
841 dimitri 1.2
842 dimitri 1.1
843     // If we were not assigned any z-slabs, we don't get any redistributed
844     // tile-slabs. If we were assigned one or more slabs, we get
845     // redistributed tile-slabs for every tile.
846 dimitri 1.2 myNumSlabPiecesToRecv = (0 == myNumZSlabs) ? 0 : NUM_WET_TILES;
847 dimitri 1.1
848    
849     numTilesRecvd = 0;
850     numSlabPiecesRecvd = 0;
851    
852     ////////////////////////////////////////////////////////////////////
853     // Main loop. Handle tiles from the current epoch
854     for(;;) {
855     bufHdr_t *bufHdr;
856    
857     ////////////////////////////////////////////////////////////////
858     // Check for tiles from the computational tasks
859     while (numTilesRecvd < fieldInfo->tileCount) {
860     int tileID = -1;
861     size_t msgSize = tileOneZLevelSizeInBytes * fieldInfo->zDepth;
862     bufHdr = tryToReceiveDataTile(fieldInfo->dataIntercomm, epochID,
863     msgSize, &tileID);
864     if (NULL == bufHdr) break; // No tile was received
865    
866     numTilesRecvd += 1;
867 dimitri 1.2 redistributeZSlabs(bufHdr, tileID, zSlabsPer, fieldInfo);
868 dimitri 1.1
869     // Add the bufHdr to the "in use" list
870     bufHdr->next = inUseTileBufs_ptr;
871     inUseTileBufs_ptr = bufHdr;
872     }
873    
874    
875     ////////////////////////////////////////////////////////////////
876     // Check for tile-slabs redistributed from the i/o processes
877     while (numSlabPiecesRecvd < myNumSlabPiecesToRecv) {
878 dimitri 1.2 long int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs;
879 dimitri 1.1 char data[msgSize];
880    
881     int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm);
882     if (tileID < 0) break; // No slab was received
883    
884     numSlabPiecesRecvd += 1;
885     processSlabSection(fieldInfo, tileID, data, myNumZSlabs);
886    
887     // Can do the write here, or at the end of the epoch.
888     // Probably want to make it asynchronous (waiting for
889     // completion at the barrier at the start of the next epoch).
890     //if (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) {
891     // ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
892     // do_write(io_epoch,whichField,myFirstZSlab,myNumZSlabs);
893     //}
894     }
895    
896    
897     ////////////////////////////////////////////////////////////////
898 dimitri 1.2 // Sanity check for non-writers
899     if (0 == myNumSlabPiecesToRecv) {
900    
901     long int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs;
902     char data[msgSize];
903    
904     // Check that no one has tried to re-distribute a slab to us.
905     int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm);
906     ASSERT (tileID < 0);
907     }
908    
909    
910     ////////////////////////////////////////////////////////////////
911 dimitri 1.1 // Check if we can release any of the inUse buffers (i.e. the
912     // isends we used to redistribute those z-slabs have completed).
913     // We do this by detaching the inUse list, then examining each
914     // element in the list, putting each element either into the
915     // free list or back into the inUse list, as appropriate.
916     bufHdr = inUseTileBufs_ptr;
917     inUseTileBufs_ptr = NULL;
918     while (NULL != bufHdr) {
919     int count = 0;
920     int completions[intracommSize];
921     MPI_Status statuses[intracommSize];
922    
923     // Acquire the "next" bufHdr now, before we change anything.
924     bufHdr_t *nextBufHeader = bufHdr->next;
925    
926     // Check to see if the Isend requests have completed for this buf
927     MPI_Testsome(intracommSize, bufHdr->requests,
928     &count, completions, statuses);
929    
930     if (MPI_UNDEFINED == count) {
931     // A return of UNDEFINED means that none of the requests
932     // is still outstanding, i.e. they have all completed.
933     // Put the buf on the free list.
934     bufHdr->state = bufState_Free;
935     bufHdr->next = freeTileBufs_ptr;
936     freeTileBufs_ptr = bufHdr;
937     FPRINTF(stderr,"Rank %d freed a tile buffer\n", intracommRank);
938     } else {
939     // At least one request is still outstanding.
940 dimitri 1.2 // Put the buf back on the inUse list.
941 dimitri 1.1 bufHdr->next = inUseTileBufs_ptr;
942     inUseTileBufs_ptr = bufHdr;
943     }
944    
945     // Countinue with the next buffer
946     bufHdr = nextBufHeader;
947     }
948    
949    
950     ////////////////////////////////////////////////////////////////
951     // Check if the epoch is complete
952     if ((numTilesRecvd >= fieldInfo->tileCount) &&
953     (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) &&
954     (NULL == inUseTileBufs_ptr))
955     {
956     ASSERT(numTilesRecvd == fieldInfo->tileCount);
957     ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
958    
959     //fprintf(stderr,"rank %d %d %d %d\n",intracommRank,numTilesRecvd,numSlabPiecesRecvd,myNumZSlabs);
960    
961     // Ok, wrap up the current i/o epoch
962     // Probably want to make this asynchronous (waiting for
963     // completion at the start of the next epoch).
964     do_write(epochID, fieldInfo, myFirstZSlab, myNumZSlabs, gcmIter);
965    
966     // The epoch is complete
967     return;
968     }
969     }
970    
971     }
972    
973    
974    
975    
976     // Main routine for the i/o tasks. Callers DO NOT RETURN from
977     // this routine; they MPI_Finalize and exit.
978     void
979 dimitri 1.2 ioRankMain (void)
980 dimitri 1.1 {
981     // This is one of a group of i/o processes that will be receiving tiles
982     // from the computational processes. This same process is also
983     // responsible for compositing slices of tiles together into one or
984     // more complete slabs, and then writing those slabs out to disk.
985     //
986     // When a tile is received, it is cut up into slices, and each slice is
987     // sent to the appropriate compositing task that is dealing with those
988     // "z" level(s) (possibly including ourselves). These are tagged with
989     // the tile-id. When we *receive* such a message (from ourselves, or
990     // from any other process in this group), we unpack the data (stripping
991     // the ghost cells), and put them into the slab we are assembling. Once
992     // a slab is fully assembled, it is written to disk.
993    
994 dimitri 1.2 int i, ierr, count, numTileBufs;
995 dimitri 1.1 MPI_Status status;
996     int currentEpochID = 0;
997     int maxTileCount = 0, max3DTileCount = 0, maxIntracommSize = 0;
998    
999     int numComputeRanks, epochStyleIndex;
1000     MPI_Comm_remote_size(globalIntercomm, &numComputeRanks);
1001    
1002    
1003 dimitri 1.2 int *xoff = malloc(TOTAL_NUM_TILES*sizeof(int));
1004 dimitri 1.1 ASSERT(xoff);
1005 dimitri 1.2 int *yoff = malloc(TOTAL_NUM_TILES*sizeof(int));
1006 dimitri 1.1 ASSERT(yoff);
1007 dimitri 1.2 int *xskip = malloc(TOTAL_NUM_TILES*sizeof(int));
1008 dimitri 1.1 ASSERT(xskip);
1009    
1010 dimitri 1.2 offsetTable = malloc((TOTAL_NUM_TILES+1)*sizeof(tile_layout_t));
1011 dimitri 1.1 ASSERT(offsetTable);
1012    
1013 dimitri 1.2 ierr = MPI_Bcast(xoff,TOTAL_NUM_TILES,MPI_INT,0,globalIntercomm);
1014     ierr = MPI_Bcast(yoff,TOTAL_NUM_TILES,MPI_INT,0,globalIntercomm);
1015     ierr = MPI_Bcast(xskip,TOTAL_NUM_TILES,MPI_INT,0,globalIntercomm);
1016 dimitri 1.1
1017 dimitri 1.2 for (i=0;i<TOTAL_NUM_TILES;++i){
1018 dimitri 1.1 offsetTable[i+1].off = NUM_X*(yoff[i]-1) + xoff[i] - 1;
1019     offsetTable[i+1].skip = xskip[i];
1020     }
1021    
1022     free(xoff);
1023     free(yoff);
1024     free(xskip);
1025    
1026     ///////////////////////////////////////////////////////////////
1027    
1028    
1029     // At this point, the multitude of various inter-communicators have
1030     // been created and stored in the various epoch "style" arrays.
1031     // The next step is to have the computational tasks "register" the
1032     // data tiles they will send. In this version of the code, we only
1033     // track and store *counts*, not the actual tileIDs.
1034     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1035     fieldInfoThisEpoch_t *p = NULL;
1036     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1037     int thisIntracommSize;
1038    
1039     // Find the field we were assigned for this epoch style
1040     int curFieldIndex = -1;
1041     while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1042     p = &(thisEpochStyle[curFieldIndex]);
1043     if (MPI_COMM_NULL != p->registrationIntercomm) break;
1044     }
1045     ASSERT(NULL != p);
1046     ASSERT('\0' != p->dataFieldID);
1047 dimitri 1.2 MPI_Comm_size(p->ioRanksIntracomm, &thisIntracommSize);
1048     if (thisIntracommSize > maxIntracommSize) {
1049     maxIntracommSize = thisIntracommSize;
1050     }
1051 dimitri 1.1
1052    
1053     // Receive a message from *each* computational rank telling us
1054     // a count of how many tiles that rank will send during epochs
1055     // of this style. Note that some of these counts may be zero
1056     for (i = 0; i < numComputeRanks; ++i) {
1057     MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG,
1058     p->registrationIntercomm, &status);
1059     p->tileCount += status.MPI_TAG;
1060     }
1061     if (p->tileCount > maxTileCount) maxTileCount = p->tileCount;
1062     if (p->zDepth > 1) {
1063     if (p->tileCount > max3DTileCount) max3DTileCount = p->tileCount;
1064     }
1065    
1066     // Sanity check
1067     MPI_Allreduce (&(p->tileCount), &count, 1,
1068     MPI_INT, MPI_SUM, p->ioRanksIntracomm);
1069 dimitri 1.2 ASSERT(count == NUM_WET_TILES);
1070 dimitri 1.1 }
1071    
1072    
1073     // In some as-of-yet-undetermined fashion, we decide how many
1074     // buffers to allocate to hold tiles received from the computational
1075     // tasks. The number of such buffers is more-or-less arbitrary (the
1076     // code should be able to function with any number greater than zero).
1077     // More buffers means more parallelism, but uses more space.
1078     // Note that maxTileCount is an upper bound on the number of buffers
1079     // that we can usefully use. Note also that if we are assigned a 2D
1080     // field, we might be assigned to recieve a very large number of tiles
1081     // for that field in that epoch style.
1082    
1083     // Hack - for now, just pick a value for numTileBufs
1084     numTileBufs = (max3DTileCount > 0) ? max3DTileCount : maxTileCount;
1085     if (numTileBufs < 4) numTileBufs = 4;
1086 dimitri 1.2 if (numTileBufs > 25) numTileBufs = 25;
1087 dimitri 1.1
1088     allocateTileBufs(numTileBufs, maxIntracommSize);
1089     countBufs(numTileBufs);
1090    
1091    
1092     ////////////////////////////////////////////////////////////////////
1093     // Main loop.
1094     ////////////////////////////////////////////////////////////////////
1095    
1096     for (;;) {
1097     int cmd[4];
1098    
1099     // Make sure all the i/o ranks have completed processing the prior
1100     // cmd (i.e. the prior i/o epoch is complete).
1101     MPI_Barrier(ioIntracomm);
1102    
1103     if (0 == ioIntracommRank) {
1104 dimitri 1.2 fprintf(stderr, "I/O ranks waiting for new epoch at time %f\n",MPI_Wtime());
1105     MPI_Send(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, globalIntercomm);
1106 dimitri 1.1
1107     MPI_Recv(cmd, 4, MPI_INT, 0, 0, globalIntercomm, MPI_STATUS_IGNORE);
1108 dimitri 1.2 fprintf(stderr, "I/O ranks begining new epoch: %d, gcmIter = %d, at time %f\n",
1109     cmd[1], cmd[3], MPI_Wtime());
1110 dimitri 1.1
1111     // before we start a new epoch, have i/o rank 0:
1112     // determine output filenames for this epoch
1113     // clean up any extant files with same names
1114     // write .meta files
1115    
1116     if (cmd_exit != cmd[0]){
1117    
1118     fprintf(stderr,"new epoch: epoch %d, style %d, gcmIter %d\n", cmd[1],cmd[2],cmd[3]);
1119    
1120     int epochStyle = cmd[2];
1121     int gcmIter = cmd[3];
1122    
1123     fieldInfoThisEpoch_t *fieldInfo;
1124     fieldInfo = epochStyles[epochStyle];
1125     char s[1024];
1126     int res;
1127     FILE *fp;
1128    
1129     if (fieldInfo->pickup==0){ // for non-pickups, need to loop over individual fields
1130     char f;
1131     while (f = fieldInfo->dataFieldID){
1132     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1133     fprintf(stderr,"%s\n",s);
1134     res = unlink(s);
1135     if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1136    
1137     // skip writing meta files for non-pickup fields
1138     /*
1139     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1140     fp = fopen(s,"w+");
1141     fclose(fp);
1142     */
1143    
1144     ++fieldInfo;
1145    
1146     }
1147     }
1148    
1149     else { // single pickup or pickup_seaice file
1150    
1151     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1152     fprintf(stderr,"%s\n",s);
1153     res = unlink(s);
1154     if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1155    
1156     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1157     fp = fopen(s,"w+");
1158     write_pickup_meta(fp, gcmIter, fieldInfo->pickup);
1159     fclose(fp);
1160    
1161     }
1162     }
1163     }
1164     MPI_Bcast(cmd, 4, MPI_INT, 0, ioIntracomm);
1165    
1166 dimitri 1.2 if (0 == ioIntracommRank)
1167     fprintf(stderr,"i/o handshake completed %d %d %f\n",cmd[1],cmd[3],MPI_Wtime());
1168    
1169 dimitri 1.1 switch (cmd[0]) {
1170    
1171     case cmd_exit:
1172     // Don't bother trying to disconnect and free the
1173     // plethora of communicators; just exit.
1174     FPRINTF(stderr,"Received shutdown message\n");
1175     MPI_Finalize();
1176     exit(0);
1177     break;
1178    
1179     case cmd_newEpoch:
1180     if ((currentEpochID + 1) != cmd[1]) {
1181     fprintf(stderr, "ERROR: Missing i/o epoch? was %d, "
1182     "but is now %d ??\n", currentEpochID, cmd[1]);
1183     sleep(1); // Give the message a chance to propagate
1184     abort();
1185     }
1186     currentEpochID = cmd[1];
1187    
1188     memset(outBuf,0,outBufSize); // zero the outBuf, so dry tiles are well defined
1189    
1190     doNewEpoch(cmd[1], cmd[2], cmd[3]);
1191     break;
1192    
1193     default:
1194     fprintf(stderr, "Unexpected epoch command: %d %d %d %d\n",
1195     cmd[0], cmd[1], cmd[2], cmd[3]);
1196     sleep(1);
1197     abort();
1198     break;
1199     }
1200    
1201     }
1202    
1203     }
1204    
1205    
1206    
1207     ///////////////////////////////////////////////////////////////////////////////////
1208    
1209     int
1210     findField(const char c)
1211     {
1212     int i;
1213     for (i = 0; i < numAllFields; ++i) {
1214     if (c == fieldDepths[i].dataFieldID) return i;
1215     }
1216    
1217     // Give the error message a chance to propagate before exiting.
1218     fprintf(stderr, "ERROR: Field not found: '%c'\n", c);
1219     sleep(1);
1220     abort();
1221     }
1222    
1223    
1224    
1225     // Given a number of ranks and a set of fields we will want to output,
1226     // figure out how to distribute the ranks among the fields.
1227 dimitri 1.2 // We attempt to distribute according to three separate criteria:
1228     // 1) Try to make the number of ranks assigned to a field be
1229     // proportional to the number of bytes in that field.
1230     // 2) Try to even out the total number of MPI messages that are
1231     // sent to any given i/o node.
1232     // 3) Try to even out the amount of output being written from
1233     // any given i/o node.
1234    
1235 dimitri 1.1 void
1236     computeIORankAssigments(
1237     int numComputeRanks,
1238     int numIORanks,
1239     int numFields,
1240     fieldInfoThisEpoch_t *fields,
1241 dimitri 1.2 int assignments[],
1242     int nToWrite[])
1243 dimitri 1.1 {
1244 dimitri 1.2 int i,j,k, iteration, sum, count;
1245 dimitri 1.1
1246     int numIONodes = numIORanks / numRanksPerNode;
1247 dimitri 1.2 int numIORanksThisField[numFields], nRanks[numFields];
1248 dimitri 1.1 long int bytesPerIORankThisField[numFields];
1249     int expectedMessagesPerRankThisField[numFields];
1250 dimitri 1.2 int isWriter[numIORanks];
1251 dimitri 1.1 long int fieldSizes[numFields];
1252    
1253     struct ioNodeInfo_t {
1254     int expectedNumMessagesThisNode;
1255     int numCoresAssigned;
1256     int *assignedFieldThisCore;
1257     } ioNodeInfo[numIONodes];
1258    
1259     // Since numRanksPerNode might be dynamically determined, we have
1260     // to dynamically allocate the assignedFieldThisCore arrays.
1261     for (i = 0; i < numIONodes; ++i) {
1262     ioNodeInfo[i].assignedFieldThisCore = malloc(sizeof(int)*numRanksPerNode);
1263     ASSERT(NULL != ioNodeInfo[i].assignedFieldThisCore);
1264     }
1265    
1266     ASSERT((numIONodes*numRanksPerNode) == numIORanks);
1267    
1268 dimitri 1.2 memset (assignments, 0, numIORanks*sizeof(int));
1269     memset (isWriter, 0, numIORanks*sizeof(int));
1270    
1271 dimitri 1.1
1272     //////////////////////////////////////////////////////////////
1273     // Compute the size for each field in this epoch style
1274     for (i = 0; i < numFields; ++i) {
1275     ASSERT((1 == fields[i].zDepth) || (NUM_Z == fields[i].zDepth) || (MULTDIM == fields[i].zDepth));
1276     fieldSizes[i] = twoDFieldSizeInBytes * fields[i].zDepth;
1277     }
1278    
1279    
1280 dimitri 1.2 /////////////////////////////////////////////////////////////////
1281     // Phase 1: Distribute the number of available i/o ranks among
1282     // the fields, proportionally based on field size.
1283 dimitri 1.1
1284     // First, assign one rank per field
1285     ASSERT(numIORanks >= numFields);
1286     for (i = 0; i < numFields; ++i) {
1287     numIORanksThisField[i] = 1;
1288     bytesPerIORankThisField[i] = fieldSizes[i];
1289     }
1290    
1291     // Now apportion any extra ranks
1292     for (; i < numIORanks; ++i) {
1293    
1294     // Find the field 'k' with the most bytesPerIORank
1295     k = 0;
1296     for (j = 1; j < numFields; ++j) {
1297     if (bytesPerIORankThisField[j] > bytesPerIORankThisField[k]) {
1298     k = j;
1299     }
1300     }
1301    
1302     // Assign an i/o rank to that field
1303     numIORanksThisField[k] += 1;
1304     bytesPerIORankThisField[k] = fieldSizes[k] / numIORanksThisField[k];
1305     }
1306    
1307 dimitri 1.2 /////////////////////////////////////////////////////////////////
1308     // At this point, the number of i/o ranks should be apportioned
1309 dimitri 1.1 // among the fields. Check we didn't screw up the count.
1310     for (sum = 0, i = 0; i < numFields; ++i) {
1311     sum += numIORanksThisField[i];
1312     }
1313     ASSERT(sum == numIORanks);
1314    
1315 dimitri 1.2 // Make a copy of numIORanksThisField
1316     memcpy (nRanks, numIORanksThisField, numFields*sizeof(int));
1317    
1318 dimitri 1.1
1319    
1320     //////////////////////////////////////////////////////////////////
1321 dimitri 1.2 // Phase 2: try to even out the number of messages per node.
1322 dimitri 1.1 // The *number* of i/o ranks assigned to a field is based on the
1323     // field size. But the *placement* of those ranks is based on
1324     // the expected number of messages the process will receive.
1325     // [In particular, if we have several fields each with only one
1326     // assigned i/o rank, we do not want to place them all on the
1327     // same node if we don't have to.] This traffic-load balance
1328     // strategy is an approximation at best since the messages may
1329     // not all be the same size (e.g. 2D fields vs. 3D fields), so
1330     // "number of messages" is not the same thing as "traffic".
1331     // But it should suffice.
1332    
1333     // Init a couple of things
1334     for (i = 0; i < numFields; ++i) {
1335     expectedMessagesPerRankThisField[i] =
1336     numComputeRanks / numIORanksThisField[i];
1337     }
1338     for (i = 0; i < numIONodes; ++i) {
1339     ioNodeInfo[i].expectedNumMessagesThisNode = 0;
1340     ioNodeInfo[i].numCoresAssigned = 0;
1341     for (j = 0; j < numRanksPerNode; ++j) {
1342     ioNodeInfo[i].assignedFieldThisCore[j] = -1;
1343     }
1344     }
1345    
1346     ///////////////////////////////////////////////////////////////////
1347     // Select the i/o node with the smallest expectedNumMessages, and
1348     // assign it a rank from the field with the highest remaining
1349     // expectedMessagesPerRank. Repeat until everything is assigned.
1350     // (Yes, this could be a lot faster, but isn't worth the trouble.)
1351    
1352     for (count = 0; count < numIORanks; ++count) {
1353    
1354     // Make an initial choice of node
1355     for (i = 0; i < numIONodes; ++i) {
1356     if (ioNodeInfo[i].numCoresAssigned < numRanksPerNode) break;
1357     }
1358     j = i;
1359     ASSERT(j < numIONodes);
1360    
1361     // Search for a better choice
1362     for (++i; i < numIONodes; ++i) {
1363     if (ioNodeInfo[i].numCoresAssigned >= numRanksPerNode) continue;
1364     if (ioNodeInfo[i].expectedNumMessagesThisNode <
1365     ioNodeInfo[j].expectedNumMessagesThisNode)
1366     {
1367     j = i;
1368     }
1369     }
1370    
1371    
1372     // Make an initial choice of field
1373     for (i = 0; i < numFields; ++i) {
1374 dimitri 1.2 if (nRanks[i] > 0) break;
1375 dimitri 1.1 }
1376     k = i;
1377     ASSERT(k < numFields);
1378    
1379     // Search for a better choice
1380     for (++i; i < numFields; ++i) {
1381 dimitri 1.2 if (nRanks[i] <= 0) continue;
1382 dimitri 1.1 if (expectedMessagesPerRankThisField[i] >
1383     expectedMessagesPerRankThisField[k])
1384     {
1385     k = i;
1386     }
1387     }
1388    
1389     // Put one rank from the chosen field onto the chosen node
1390     ioNodeInfo[j].expectedNumMessagesThisNode += expectedMessagesPerRankThisField[k];
1391     ioNodeInfo[j].assignedFieldThisCore[ioNodeInfo[j].numCoresAssigned] = k;
1392     ioNodeInfo[j].numCoresAssigned += 1;
1393 dimitri 1.2 nRanks[k] -= 1;
1394 dimitri 1.1 }
1395    
1396     // Sanity check - all ranks were assigned to a core
1397     for (i = 1; i < numFields; ++i) {
1398 dimitri 1.2 ASSERT(0 == nRanks[i]);
1399 dimitri 1.1 }
1400     // Sanity check - all cores were assigned a rank
1401     for (i = 1; i < numIONodes; ++i) {
1402     ASSERT(numRanksPerNode == ioNodeInfo[i].numCoresAssigned);
1403     }
1404    
1405    
1406 dimitri 1.2 //////////////////////////////////////////////////////////////////
1407     // Phase 3: Select which ranks will be assigned to write out the
1408     // fields. We attempt to balance the amount that each node is
1409     // assigned to write out. Since Phase 1 and Phase 2 have already
1410     // fixed a number of things, our options for balancing things
1411     // is somewhat restricted.
1412    
1413    
1414     // Count how many nodes each field is distributed across
1415     int numIONodesThisField[numFields];
1416     for (i = 0; i < numFields; ++i) {
1417     numIONodesThisField[i] = 0;
1418     for (j = 0; j < numIONodes; ++j) {
1419     for (k = 0; k < numRanksPerNode; ++k) {
1420     if (ioNodeInfo[j].assignedFieldThisCore[k] == i) {
1421     numIONodesThisField[i] += 1;
1422     break;
1423     }
1424     }
1425     }
1426     ASSERT (numIONodesThisField[i] > 0);
1427     ASSERT (numIONodesThisField[i] <= numIONodes);
1428     }
1429    
1430    
1431     // Init a per-node running count of z-levels-to-write
1432     memset (nToWrite, 0, numIONodes*sizeof(int));
1433    
1434    
1435     // Iterate through all the fields, although we will select
1436     // which field to operate on (i.e. curField) non-sequentially.
1437     for (iteration = 0; iteration < numFields; ++iteration) {
1438    
1439     // Pick the field distributed across the fewest number of nodes, on
1440     // the theory that this field will have the least flexibility.
1441     int curField = 0;
1442     for (j = 1; j < numFields; ++j) {
1443     if (numIONodesThisField[j] < numIONodesThisField[curField]) {
1444     curField = j;
1445     }
1446     }
1447     ASSERT (numIONodesThisField[curField] <= numIONodes);
1448    
1449     // Now that we have chosen a field, identify any i/o nodes
1450     // that have rank(s) assigned to that field.
1451     int nAssigned[numIONodes];
1452     for (i = 0; i < numIONodes; ++i) {
1453     nAssigned[i] = 0;
1454     for (j = 0; j < numRanksPerNode; ++j) {
1455     if (ioNodeInfo[i].assignedFieldThisCore[j] == curField) {
1456     nAssigned[i] += 1;
1457     }
1458     }
1459     }
1460    
1461    
1462     // We do the writes in units of entire z-levels. If a rank is a
1463     // writer, it is assigned to write a chunk of one or more z-levels.
1464     int chunkSize = divCeil (fields[curField].zDepth, numIORanksThisField[curField]);
1465     int nChunks = divCeil (fields[curField].zDepth, chunkSize);
1466     int curChunk;
1467    
1468     // Designate the same number of ranks to be writers as there are chunks
1469     fields[curField].chunkWriters = malloc (nChunks*sizeof(int));
1470     ASSERT (fields[curField].chunkWriters != NULL);
1471     fields[curField].nChunks = nChunks;
1472     int nAvailable[numIONodes];
1473     memcpy (nAvailable, nAssigned, numIONodes*sizeof(int));
1474     for (curChunk = 0; curChunk < nChunks; ++curChunk) {
1475    
1476     // Note that the last chunk might be an odd size (if chunkSize
1477     // does not evenly divide zDepth).
1478     if ((curChunk + 1) == nChunks) {
1479     chunkSize = fields[curField].zDepth - curChunk*chunkSize;
1480     }
1481    
1482     // Pick a node that has at least one rank assigned to curField
1483     // that has not already been designated as a writer.
1484     // There must still be at least one, or we would have exited
1485     // the loop already.
1486     int curNode;
1487     for (curNode = 0; curNode < numIONodes; ++curNode) {
1488     if (nAvailable[curNode] > 0) break;
1489     }
1490     ASSERT (curNode < numIONodes);
1491    
1492     // curNode is a candidate node; try to find an acceptable
1493     // candidate node with a smaller nToWrite
1494     for (i = curNode + 1; i < numIONodes; ++i) {
1495     if ((nAvailable[i] > 0) && (nToWrite[i] < nToWrite[curNode])) {
1496     curNode = i;
1497     }
1498     }
1499    
1500     // Find an appropriate rank on the chosen node
1501     for (j = 0; j < numRanksPerNode; ++j) {
1502     if ( (ioNodeInfo[curNode].assignedFieldThisCore[j] == curField) &&
1503     (!isWriter[curNode*numRanksPerNode + j]) )
1504     {
1505     break;
1506     }
1507     }
1508     // Double-check that we found an appropriate rank.
1509     ASSERT (j < numRanksPerNode);
1510    
1511    
1512     // We've picked a rank to be the writer of the current chunk.
1513     // Now we need to figure out which rank this will wind up being
1514     // in the "ioRanksIntracomm" for this field (when that intracomm
1515     // eventually gets created), so we can set "chunkWriters".
1516     int eventualCommRank = 0;
1517     for (i = 0; i < curNode; ++i) eventualCommRank += nAssigned[i];
1518     for (i = 0; i < j; ++i) {
1519     if (ioNodeInfo[curNode].assignedFieldThisCore[i] == curField) {
1520     eventualCommRank += 1;
1521     }
1522     }
1523    
1524    
1525     // Update the info for this choice of node+rank
1526     fields[curField].chunkWriters[curChunk] = eventualCommRank;
1527     isWriter[curNode*numRanksPerNode + j] = 1;
1528     nAvailable[curNode] -= 1;
1529     nToWrite[curNode] += chunkSize;
1530    
1531     }
1532    
1533     // We've finished with this curField; mark it so that we don't
1534     // re-select this same curField the next time through the loop.
1535     numIONodesThisField[curField] = numIONodes + 1;
1536     }
1537    
1538    
1539     //////////////////////////////////////////////////////////////////
1540 dimitri 1.1 // Return the computed assignments
1541 dimitri 1.2 // N.B. We are also returning info in "fields[*].chunkWriters"
1542 dimitri 1.1 for (i = 0; i < numIONodes; ++i) {
1543     for (j = 0; j < numRanksPerNode; ++j) {
1544     assignments[i*numRanksPerNode + j] =
1545     ioNodeInfo[i].assignedFieldThisCore[j];
1546     }
1547     }
1548    
1549 dimitri 1.2
1550 dimitri 1.1 // Clean up
1551     for (i = 0; i < numIONodes; ++i) {
1552     free(ioNodeInfo[i].assignedFieldThisCore);
1553     }
1554    
1555     }
1556    
1557     //////////////////////////////////////////////////////////////////////////////////
1558    
1559    
1560     int
1561     isIORank(int commRank, int totalNumNodes, int numIONodes)
1562     {
1563     // Figure out if this rank is on a node that will do i/o.
1564     // Note that the i/o nodes are distributed throughout the
1565     // task, not clustered together.
1566     int thisRankNode = commRank / numRanksPerNode;
1567    
1568 dimitri 1.2 int ioNodeStride = totalNumNodes / numIONodes;
1569     int extra = totalNumNodes % numIONodes;
1570 dimitri 1.1
1571 dimitri 1.2 int inflectionPoint = (ioNodeStride+1)*extra;
1572     if (thisRankNode <= inflectionPoint) {
1573     return (thisRankNode % (ioNodeStride+1)) == 0;
1574     } else {
1575     return ((thisRankNode - inflectionPoint) % ioNodeStride) == 0;
1576     }
1577 dimitri 1.1
1578 dimitri 1.2 }
1579 dimitri 1.1
1580    
1581    
1582 dimitri 1.2 // Get the number of MPI ranks that are running on one node.
1583     // We assume that the MPI ranks are launched in sequence, filling one
1584     // node before going to the next, and that each node has the same
1585     // number of ranks (except possibly the last node, which is allowed
1586     // to be short).
1587     int
1588     getNumRanksPerNode (MPI_Comm comm)
1589     {
1590     char myHostname[HOST_NAME_MAX+1];
1591     int status, count;
1592     int commSize, commRank;
1593    
1594     MPI_Comm_size (comm, &commSize);
1595     MPI_Comm_rank (comm, &commRank);
1596    
1597     status = gethostname (myHostname, HOST_NAME_MAX);
1598     myHostname[HOST_NAME_MAX] = '\0';
1599     ASSERT (0 == status);
1600    
1601     if (0 == commRank) {
1602     int nBlocks, ii, jj;
1603     assert (allHostnames != NULL);
1604    
1605     // We assume these are already in-order and so don't
1606     // need to be sorted.
1607    
1608     // Count how many ranks have the same hostname as rank 0
1609     for (count = 0;
1610     (count < commSize) &&
1611     (strcmp(&(allHostnames[count][0]), myHostname) == 0);
1612     ++count);
1613     ASSERT (count > 0);
1614    
1615     // Check that the count is consistent for each block of hostnames
1616     // (except possibly the last block, which might not be full).
1617     nBlocks = commSize / count;
1618    
1619     for (jj = 1; jj < nBlocks; ++jj) {
1620     char *p = &(allHostnames[jj*count][0]);
1621     for (ii = 0; ii < count; ++ii) {
1622     char *q = &(allHostnames[jj*count + ii][0]);
1623     ASSERT (strcmp (p, q) == 0);
1624     }
1625     }
1626     }
1627 dimitri 1.1
1628 dimitri 1.2 MPI_Bcast (&count, 1, MPI_INT, 0, comm);
1629     return count;
1630 dimitri 1.1 }
1631    
1632    
1633     ////////////////////////////////////////////////////////////////////////
1634     ////////////////////////////////////////////////////////////////////////
1635     // Routines called by the mitGCM code
1636    
1637    
1638     ////////////////////////////////////////
1639     // Various "one time" initializations
1640     void
1641     f1(
1642     MPI_Comm parentComm,
1643     int numComputeRanks,
1644     int numTiles,
1645     MPI_Comm *rtnComputeComm)
1646     {
1647 dimitri 1.2 // Note that we ignore the argument "numTiles". This value used to
1648     // be needed, but now we get the information directly from "SIZE.h"
1649    
1650 dimitri 1.1 MPI_Comm newIntracomm, newIntercomm, dupParentComm;
1651     int newIntracommRank, thisIsIORank;
1652     int parentSize, parentRank;
1653     int numIONodes, numIORanks;
1654     int mpiIsInitialized, *intPtr, tagUBexists;
1655     int i, j, nF, compRoot, fieldIndex, epochStyleIndex;
1656     int totalNumNodes, numComputeNodes, newIntracommSize;
1657    
1658 dimitri 1.2
1659 dimitri 1.1 // Init globals
1660 dimitri 1.2
1661     // Info about the parent communicator
1662     MPI_Initialized(&mpiIsInitialized);
1663     ASSERT(mpiIsInitialized);
1664     MPI_Comm_size(parentComm, &parentSize);
1665     MPI_Comm_rank(parentComm, &parentRank);
1666    
1667     // Find the max MPI "tag" value
1668     MPI_Attr_get(MPI_COMM_WORLD, MPI_TAG_UB, &intPtr, &tagUBexists);
1669     ASSERT(tagUBexists);
1670     maxTagValue = *intPtr;
1671    
1672     // Gather all the hostnames to rank zero
1673     char myHostname[HOST_NAME_MAX+1];
1674     int status;
1675     status = gethostname (myHostname, HOST_NAME_MAX);
1676     myHostname[HOST_NAME_MAX] = '\0';
1677     ASSERT (0 == status);
1678     if (parentRank != 0) {
1679     // send my hostname to rank 0
1680     MPI_Gather (myHostname, HOST_NAME_MAX+1, MPI_CHAR,
1681     NULL, 0, MPI_CHAR, 0, parentComm);
1682     }
1683     else {
1684     allHostnames = malloc (parentSize*(HOST_NAME_MAX+1));
1685     assert (allHostnames != NULL);
1686    
1687     // Collect the hostnames from all the ranks
1688     MPI_Gather (myHostname, HOST_NAME_MAX+1, MPI_CHAR,
1689     allHostnames, HOST_NAME_MAX+1, MPI_CHAR,
1690     0, parentComm);
1691     }
1692    
1693 dimitri 1.1 if (numRanksPerNode <= 0) {
1694     // Might also want to check for an env var (or something)
1695 dimitri 1.2 // N.B.: getNumRanksPerNode uses an MPI collective on the communicator
1696     // and needs the global "allHostnames" to already be set in rank 0.
1697     numRanksPerNode = getNumRanksPerNode(parentComm);
1698 dimitri 1.1 }
1699    
1700     // Fill in the zDepth field of the fieldInfoThisEpoch_t descriptors
1701     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1702     fieldInfoThisEpoch_t *f, *thisEpochStyle = epochStyles[epochStyleIndex];
1703    
1704     int curFieldIndex = -1;
1705     while ((f = &(thisEpochStyle[++curFieldIndex]))->dataFieldID != '\0') {
1706     i = findField(f->dataFieldID);
1707     if (-1 != f->zDepth) {
1708     if (f->zDepth != fieldDepths[i].numZ) {
1709     fprintf(stderr, "Inconsistent z-depth for field '%c': "
1710     "fieldDepths[%d] and epoch style %d\n",
1711     f->dataFieldID, i, epochStyleIndex);
1712     }
1713     }
1714     f->zDepth = fieldDepths[i].numZ;
1715     }
1716     }
1717    
1718    
1719    
1720     // Figure out how many nodes we can use for i/o.
1721     // To make things (e.g. memory calculations) simpler, we want
1722     // a node to have either all compute ranks, or all i/o ranks.
1723     // If numComputeRanks does not evenly divide numRanksPerNode, we have
1724     // to round up in favor of the compute side.
1725    
1726     totalNumNodes = divCeil(parentSize, numRanksPerNode);
1727     numComputeNodes = divCeil(numComputeRanks, numRanksPerNode);
1728     numIONodes = (parentSize - numComputeRanks) / numRanksPerNode;
1729     ASSERT(numIONodes > 0);
1730     ASSERT(numIONodes <= (totalNumNodes - numComputeNodes));
1731     numIORanks = numIONodes * numRanksPerNode;
1732    
1733    
1734 dimitri 1.2 // Print out the hostnames, identifying which ones are i/o nodes
1735    
1736     if (0 == parentRank) {
1737     printf ("\n%d total nodes, %d i/o, %d compute\n",
1738     totalNumNodes, numIONodes, numComputeNodes);
1739     for (i = 0; i < parentSize; i += numRanksPerNode) {
1740     if (isIORank (i, totalNumNodes, numIONodes)) {
1741     printf ("\n(%s)", &(allHostnames[i][0]));
1742     } else {
1743     printf (" %s", &(allHostnames[i][0]));
1744     }
1745     }
1746     printf ("\n\n");
1747     }
1748     fflush (stdout);
1749    
1750    
1751    
1752 dimitri 1.1 // It is surprisingly easy to launch the job with the wrong number
1753     // of ranks, particularly if the number of compute ranks is not a
1754     // multiple of numRanksPerNode (the tendency is to just launch one
1755     // rank per core for all available cores). So we introduce a third
1756     // option for a rank: besides being an i/o rank or a compute rank,
1757     // it might instead be an "excess" rank, in which case it just
1758     // calls MPI_Finalize and exits.
1759    
1760     typedef enum { isCompute, isIO, isExcess } rankType;
1761     rankType thisRanksType;
1762    
1763     if (isIORank(parentRank, totalNumNodes, numIONodes)) {
1764     thisRanksType = isIO;
1765     } else if (parentRank >= numComputeRanks + numIORanks) {
1766     thisRanksType = isExcess;
1767     } else {
1768     thisRanksType = isCompute;
1769     }
1770    
1771     // Split the parent communicator into parts
1772     MPI_Comm_split(parentComm, thisRanksType, parentRank, &newIntracomm);
1773     MPI_Comm_rank(newIntracomm, &newIntracommRank);
1774    
1775     // Excess ranks disappear
1776 dimitri 1.2 // N.B. "parentComm" (and parentSize) still includes these ranks, so
1777     // we cannot do MPI *collectives* using parentComm after this point,
1778     // although the communicator can still be used for other things.
1779 dimitri 1.1 if (isExcess == thisRanksType) {
1780     MPI_Finalize();
1781     exit(0);
1782     }
1783    
1784     // Sanity check
1785     MPI_Comm_size(newIntracomm, &newIntracommSize);
1786     if (isIO == thisRanksType) {
1787     ASSERT(newIntracommSize == numIORanks);
1788     } else {
1789     ASSERT(newIntracommSize == numComputeRanks);
1790     }
1791    
1792    
1793     // Create a special intercomm from the i/o and compute parts
1794     if (isIO == thisRanksType) {
1795     // Set globals
1796     ioIntracomm = newIntracomm;
1797     MPI_Comm_rank(ioIntracomm, &ioIntracommRank);
1798    
1799     // Find the lowest computation rank
1800     for (i = 0; i < parentSize; ++i) {
1801     if (!isIORank(i, totalNumNodes, numIONodes)) break;
1802     }
1803     } else { // isCompute
1804     // Set globals
1805     computeIntracomm = newIntracomm;
1806     MPI_Comm_rank(computeIntracomm, &computeIntracommRank);
1807    
1808     // Find the lowest IO rank
1809     for (i = 0; i < parentSize; ++i) {
1810     if (isIORank(i, totalNumNodes, numIONodes)) break;
1811     }
1812     }
1813     MPI_Intercomm_create(newIntracomm, 0, parentComm, i, 0, &globalIntercomm);
1814    
1815    
1816    
1817     ///////////////////////////////////////////////////////////////////
1818     // For each different i/o epoch style, split the i/o ranks among
1819     // the fields, and create an inter-communicator for each split.
1820    
1821     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1822 dimitri 1.2 if (0 == parentRank) {
1823     printf ("Set up epoch style %d\n", epochStyleIndex);
1824     }
1825    
1826 dimitri 1.1 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1827     int fieldAssignments[numIORanks];
1828     int rankAssignments[numComputeRanks + numIORanks];
1829    
1830     // Count the number of fields in this epoch style
1831     for (nF = 0; thisEpochStyle[nF].dataFieldID != '\0'; ++nF) ;
1832    
1833 dimitri 1.2 // Decide how to apportion the i/o ranks among the fields.
1834     // (Currently, this call also sets the "chunkWriters" array.)
1835 dimitri 1.1 for (i=0; i < numIORanks; ++i) fieldAssignments[i] = -1;
1836 dimitri 1.2 int nToWrite[numIORanks/numRanksPerNode];
1837 dimitri 1.1 computeIORankAssigments(numComputeRanks, numIORanks, nF,
1838 dimitri 1.2 thisEpochStyle, fieldAssignments, nToWrite);
1839 dimitri 1.1 // Sanity check
1840     for (i=0; i < numIORanks; ++i) {
1841     ASSERT((fieldAssignments[i] >= 0) && (fieldAssignments[i] < nF));
1842     }
1843    
1844     // Embed the i/o rank assignments into an array holding
1845     // the assignments for *all* the ranks (i/o and compute).
1846     // Rank assignment of "-1" means "compute".
1847     j = 0;
1848     for (i = 0; i < numComputeRanks + numIORanks; ++i) {
1849     rankAssignments[i] = isIORank(i, totalNumNodes, numIONodes) ?
1850     fieldAssignments[j++] : -1;
1851     }
1852     // Sanity check. Check the assignment for this rank.
1853     if (isIO == thisRanksType) {
1854     ASSERT(fieldAssignments[newIntracommRank] == rankAssignments[parentRank]);
1855     } else {
1856     ASSERT(-1 == rankAssignments[parentRank]);
1857     }
1858     ASSERT(j == numIORanks);
1859    
1860 dimitri 1.2
1861 dimitri 1.1 if (0 == parentRank) {
1862 dimitri 1.2 printf ("Create communicators for epoch style %d\n", epochStyleIndex);
1863 dimitri 1.1 }
1864 dimitri 1.2 fflush (stdout);
1865    
1866 dimitri 1.1
1867     // Find the lowest rank assigned to computation; use it as
1868     // the "root" for the upcoming intercomm creates.
1869     for (compRoot = 0; rankAssignments[compRoot] != -1; ++compRoot);
1870     // Sanity check
1871     if ((isCompute == thisRanksType) && (0 == newIntracommRank)) ASSERT(compRoot == parentRank);
1872    
1873     // If this is an I/O rank, split the newIntracomm (which, for
1874     // the i/o ranks, is a communicator among all the i/o ranks)
1875     // into the pieces as assigned.
1876    
1877     if (isIO == thisRanksType) {
1878     MPI_Comm fieldIntracomm;
1879     int myField = rankAssignments[parentRank];
1880     ASSERT(myField >= 0);
1881    
1882     MPI_Comm_split(newIntracomm, myField, parentRank, &fieldIntracomm);
1883     thisEpochStyle[myField].ioRanksIntracomm = fieldIntracomm;
1884    
1885     // Now create an inter-communicator between the computational
1886     // ranks, and each of the newly split off field communicators.
1887     for (i = 0; i < nF; ++i) {
1888    
1889     // Do one field at a time to avoid clashes on parentComm
1890     MPI_Barrier(newIntracomm);
1891    
1892     if (myField != i) continue;
1893    
1894     // Create the intercomm for this field for this epoch style
1895     MPI_Intercomm_create(fieldIntracomm, 0, parentComm,
1896     compRoot, i, &(thisEpochStyle[myField].dataIntercomm));
1897    
1898     // ... and dup a separate one for tile registrations
1899     MPI_Comm_dup(thisEpochStyle[myField].dataIntercomm,
1900     &(thisEpochStyle[myField].registrationIntercomm));
1901 dimitri 1.2
1902    
1903     // Sanity check: make sure the chunkWriters array looks ok.
1904     {
1905     int ii, jj, commSize, nChunks = thisEpochStyle[myField].nChunks;
1906     ASSERT (nChunks > 0);
1907     ASSERT (thisEpochStyle[myField].chunkWriters != NULL);
1908     MPI_Comm_size (thisEpochStyle[myField].dataIntercomm, &commSize);
1909     for (ii = 0; ii < nChunks; ++ii) {
1910     ASSERT (thisEpochStyle[myField].chunkWriters[ii] >= 0);
1911     ASSERT (thisEpochStyle[myField].chunkWriters[ii] < commSize);
1912     for (jj = ii+1; jj < nChunks; ++jj) {
1913     ASSERT (thisEpochStyle[myField].chunkWriters[ii] !=
1914     thisEpochStyle[myField].chunkWriters[jj]);
1915     }
1916     }
1917     }
1918 dimitri 1.1 }
1919     }
1920     else {
1921     // This is a computational rank; create the intercommunicators
1922     // to the various split off separate field communicators.
1923    
1924     for (fieldIndex = 0; fieldIndex < nF; ++fieldIndex) {
1925    
1926     // Find the remote "root" process for this field
1927     int fieldRoot = -1;
1928     while (rankAssignments[++fieldRoot] != fieldIndex);
1929    
1930     // Create the intercomm for this field for this epoch style
1931     MPI_Intercomm_create(newIntracomm, 0, parentComm, fieldRoot,
1932     fieldIndex, &(thisEpochStyle[fieldIndex].dataIntercomm));
1933    
1934     // ... and dup a separate one for tile registrations
1935     MPI_Comm_dup(thisEpochStyle[fieldIndex].dataIntercomm,
1936     &(thisEpochStyle[fieldIndex].registrationIntercomm));
1937     }
1938     }
1939    
1940 dimitri 1.2
1941     // Print a "map" of the core assignments. Compute nodes are indicated
1942     // by a "#" for the whole node, i/o nodes have the individual cores
1943     // within the node broken out and their field assignment printed.
1944     // If the core is writing to the disk, the field assignment is printed
1945     // in parentheses.
1946     if (0 == parentRank) {
1947     int fieldIOCounts[nF];
1948     memset (fieldIOCounts, 0, nF*sizeof(int));
1949    
1950     printf ("Writer assignments, epoch style %d\n", epochStyleIndex);
1951     for (i = 0; i < nF; ++i) {
1952     fieldInfoThisEpoch_t *p = &(thisEpochStyle[i]);
1953     int chunkSize = divCeil(p->zDepth,p->nChunks);
1954     printf ("\n");
1955     printf ( "field %2d ('%c'): %1d levels, %1d writers, %1d"
1956     " levels per writer (last writer = %d levels)\n",
1957     i, p->dataFieldID, p->zDepth, p->nChunks,
1958     chunkSize, p->zDepth - ((p->nChunks - 1)*chunkSize));
1959     for (j = 0; j < thisEpochStyle[i].nChunks; ++j) {
1960     printf (" %1d", thisEpochStyle[i].chunkWriters[j]);
1961     }
1962     printf ("\n");
1963     }
1964     printf ("\n");
1965    
1966     printf("\ncpu assignments, epoch style %d\n", epochStyleIndex);
1967     int whichIONode = -1;
1968     for (i = 0; i < numComputeNodes + numIONodes ; ++i) {
1969    
1970     if (rankAssignments[i*numRanksPerNode] >= 0) {
1971    
1972     // i/o node
1973     ++whichIONode;
1974     printf("\n%s (%d Z, %ld bytes):",
1975     &(allHostnames[i*numRanksPerNode][0]),
1976     nToWrite[whichIONode],
1977     nToWrite[whichIONode]*twoDFieldSizeInBytes);
1978    
1979     for (j = 0; j < numRanksPerNode; ++j) {
1980    
1981     int assignedField = rankAssignments[i*numRanksPerNode + j];
1982     int k, commRank, nChunks, isWriter;
1983     nChunks = thisEpochStyle[assignedField].nChunks;
1984     commRank = fieldIOCounts[assignedField];
1985    
1986     isWriter = 0;
1987     for (k = 0; k < nChunks; ++k) {
1988     if (thisEpochStyle[assignedField].chunkWriters[k] == commRank) {
1989     isWriter = 1;
1990     break;
1991     }
1992     }
1993     printf(isWriter ? " (%1d)" : " %1d ", assignedField);
1994    
1995     fieldIOCounts[assignedField] += 1;
1996     }
1997     printf("\n");
1998    
1999     } else {
2000    
2001     // compute node
2002     for (j = 0; j < numRanksPerNode; ++j) {
2003     if ((i*numRanksPerNode + j) >= (numComputeRanks + numIORanks)) break;
2004     ASSERT(-1 == rankAssignments[i*numRanksPerNode + j]);
2005     }
2006     printf(" #");
2007     for (; j < numRanksPerNode; ++j) { // "excess" ranks (if any)
2008     printf("X");
2009     }
2010     }
2011     printf(" ");
2012     }
2013     printf("\n\n");
2014     }
2015    
2016    
2017    
2018 dimitri 1.1 } // epoch style loop
2019    
2020 dimitri 1.2 // Note: only non-null in rank 0, but it's ok to "free" NULL
2021     free(allHostnames);
2022    
2023 dimitri 1.1
2024     // I/O processes split off and start receiving data
2025     // NOTE: the I/O processes DO NOT RETURN from this call
2026 dimitri 1.2 if (isIO == thisRanksType) ioRankMain();
2027 dimitri 1.1
2028    
2029     // The "compute" processes now return with their new intra-communicator.
2030     *rtnComputeComm = newIntracomm;
2031    
2032     // but first, call mpiio initialization routine!
2033     initSizesAndTypes();
2034    
2035 dimitri 1.2 fflush (stdout);
2036 dimitri 1.1 return;
2037     }
2038    
2039    
2040    
2041    
2042     // "Register" the tile-id(s) that this process will be sending
2043     void
2044     f2(int tileID)
2045     {
2046     int i, epochStyleIndex;
2047    
2048     static int count = 0;
2049    
2050     // This code assumes that the same tileID will apply to all the
2051     // fields. Note that we use the tileID as a tag, so we require it
2052     // be an int with a legal tag value, but we do *NOT* actually use
2053     // the tag *value* for anything (e.g. it is NOT used as an index
2054     // into an array). It is treated as an opaque handle that is
2055     // passed from the compute task(s) to the compositing routines.
2056     // Those two end-points likely assign meaning to the tileID (i.e.
2057     // use it to identify where the tile belongs within the domain),
2058     // but that is their business.
2059     //
2060     // A tileID of -1 signifies the end of registration.
2061    
2062     ASSERT(computeIntracommRank >= 0);
2063    
2064     if (tileID >= 0) {
2065     // In this version of the code, in addition to the tileID, we also
2066     // multiplex in the low order bit(s) of the epoch number as an
2067     // error check. So the tile value must be small enough to allow that.
2068     ASSERT(((tileID<<numCheckBits) + ((1<<numCheckBits)-1)) < maxTagValue);
2069    
2070     // In this version of the code, we do not send the actual tileID's
2071     // to the i/o processes during the registration procedure. We only
2072     // send a count of the number of tiles that we will send.
2073     ++count;
2074     return;
2075     }
2076    
2077    
2078     // We get here when we are called with a negative tileID, signifying
2079     // the end of registration. We now need to figure out and inform
2080     // *each* i/o process in *each* field just how many tiles we will be
2081     // sending them in *each* epoch style.
2082    
2083     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
2084     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
2085    
2086     int curFieldIndex = -1;
2087     while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
2088     fieldInfoThisEpoch_t *thisField = &(thisEpochStyle[curFieldIndex]);
2089     int numRemoteRanks, *tileCounts, remainder;
2090     MPI_Comm_remote_size(thisField->dataIntercomm, &numRemoteRanks);
2091    
2092     tileCounts = alloca(numRemoteRanks * sizeof(int));
2093    
2094     memset(tileCounts,0,numRemoteRanks * sizeof(int));
2095    
2096     // Distribute the tiles among the i/o ranks.
2097     for (i = 0; i < numRemoteRanks; ++i) {
2098     tileCounts[i] = count / numRemoteRanks;
2099     }
2100     // Deal with any remainder
2101     remainder = count - ((count / numRemoteRanks) * numRemoteRanks);
2102     for (i = 0; i < remainder; ++i) {
2103     int target = (computeIntracommRank + i) % numRemoteRanks;
2104     tileCounts[target] += 1;
2105     }
2106    
2107     // Communicate these counts to the i/o processes for this
2108     // field. Note that we send a message to each process,
2109     // even if the count is zero.
2110     for (i = 0; i < numRemoteRanks; ++i) {
2111     MPI_Send(NULL, 0, MPI_BYTE, i, tileCounts[i],
2112     thisField->registrationIntercomm);
2113     }
2114    
2115     } // field loop
2116     } // epoch-style loop
2117    
2118     }
2119    
2120    
2121    
2122    
2123     int currentEpochID = 0;
2124     int currentEpochStyle = 0;
2125    
2126     void
2127     beginNewEpoch(int newEpochID, int gcmIter, int epochStyle)
2128     {
2129     fieldInfoThisEpoch_t *p;
2130    
2131     if (newEpochID != (currentEpochID + 1)) {
2132     fprintf(stderr, "ERROR: Missing i/o epoch? was %d, is now %d ??\n",
2133     currentEpochID, newEpochID);
2134     sleep(1); // Give the message a chance to propagate
2135     abort();
2136     }
2137    
2138 dimitri 1.2
2139    
2140     // not necessary for correctness, but for better timings
2141     MPI_Barrier(computeIntracomm);
2142     if (0 == computeIntracommRank)
2143     fprintf(stderr,"compute ready to emit %d %d %f\n",newEpochID,gcmIter,MPI_Wtime());
2144    
2145    
2146    
2147 dimitri 1.1 ////////////////////////////////////////////////////////////////////////
2148     // Need to be sure the i/o tasks are done processing the previous epoch
2149     // before any compute tasks start sending tiles from a new epoch.
2150    
2151     if (0 == computeIntracommRank) {
2152     int cmd[4] = { cmd_newEpoch, newEpochID, epochStyle, gcmIter };
2153    
2154     // Wait to get an ack that the i/o tasks are done with the prior epoch.
2155     MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
2156     globalIntercomm, MPI_STATUS_IGNORE);
2157    
2158     // Tell the i/o ranks about the new epoch.
2159     // (Just tell rank 0; it will bcast to the others)
2160     MPI_Send(cmd, 4, MPI_INT, 0, 0, globalIntercomm);
2161     }
2162    
2163     // Compute ranks wait here until rank 0 gets the ack from the i/o ranks
2164     MPI_Barrier(computeIntracomm);
2165    
2166 dimitri 1.2 if (0 == computeIntracommRank)
2167     fprintf(stderr,"compute handshake completed %d %d %f\n",newEpochID,gcmIter,MPI_Wtime());
2168    
2169 dimitri 1.1
2170     currentEpochID = newEpochID;
2171     currentEpochStyle = epochStyle;
2172    
2173     // Reset the tileCount (used by f3)
2174     for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
2175     p->tileCount = 0;
2176     }
2177     }
2178    
2179    
2180     void
2181     f3(char dataFieldID, int tileID, int epochID, void *data)
2182     {
2183     int whichField, receiver, tag;
2184    
2185     static char priorDataFieldID = '\0';
2186     static int priorEpoch = -1;
2187     static int remoteCommSize;
2188     static fieldInfoThisEpoch_t *p;
2189    
2190     int flag=0;
2191    
2192     // Check that this global has been set
2193     ASSERT(computeIntracommRank >= 0);
2194    
2195    
2196     // Figure out some info about this dataFieldID. It is
2197     // likely to be another tile from the same field as last time
2198     // we were called, in which case we can reuse the "static" values
2199     // retained from the prior call. If not, we need to look it up.
2200    
2201     if ((dataFieldID != priorDataFieldID) || (epochID != priorEpoch)) {
2202    
2203     // It's not the same; we need to look it up.
2204    
2205     for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
2206     if (p->dataFieldID == dataFieldID) break;
2207     }
2208     // Make sure we found a valid field
2209     ASSERT(p->dataIntercomm != MPI_COMM_NULL);
2210    
2211     MPI_Comm_remote_size(p->dataIntercomm, &remoteCommSize);
2212    
2213     flag=1;
2214    
2215     }
2216    
2217     ASSERT(p->dataFieldID == dataFieldID);
2218    
2219     receiver = (computeIntracommRank + p->tileCount) % remoteCommSize;
2220     tag = (tileID << numCheckBits) | (epochID & bitMask);
2221    
2222     FPRINTF(stderr,"Rank %d sends field '%c', tile %d, to i/o task %d with tag %d(%d)\n",
2223     computeIntracommRank, dataFieldID, tileID, receiver, tag, tag >> numCheckBits);
2224    
2225     MPI_Send(data, tileOneZLevelSizeInBytes * p->zDepth,
2226     MPI_BYTE, receiver, tag, p->dataIntercomm);
2227    
2228     p->tileCount += 1;
2229     priorDataFieldID = dataFieldID;
2230     priorEpoch = epochID;
2231     }
2232    
2233    
2234    
2235     void
2236     f4(int epochID)
2237     {
2238     int i;
2239     ASSERT(computeIntracommRank >= 0);
2240    
2241     if (0 == computeIntracommRank) {
2242     int cmd[3] = { cmd_exit, -1, -1 };
2243    
2244     // Recv the ack that the prior i/o epoch is complete
2245     MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
2246     globalIntercomm, MPI_STATUS_IGNORE);
2247    
2248     // Tell the i/o ranks to exit
2249     // Just tell rank 0; it will bcast to the others
2250     MPI_Send(cmd, 3, MPI_INT, 0, 0, globalIntercomm);
2251     }
2252    
2253     }
2254    
2255    
2256    
2257 dimitri 1.2 void asyncio_tile_arrays_(int *xoff, int *yoff, int *xskip)
2258 dimitri 1.1 {
2259 dimitri 1.2 int rank, ierr, rootProc;
2260 dimitri 1.1
2261     MPI_Comm_rank(globalIntercomm,&rank);
2262 dimitri 1.2 rootProc = (0 == rank) ? MPI_ROOT : MPI_PROC_NULL;
2263 dimitri 1.1
2264 dimitri 1.2 ierr = MPI_Bcast (xoff, TOTAL_NUM_TILES, MPI_INT, rootProc, globalIntercomm);
2265     ierr = MPI_Bcast (yoff, TOTAL_NUM_TILES, MPI_INT, rootProc, globalIntercomm);
2266     ierr = MPI_Bcast (xskip, TOTAL_NUM_TILES, MPI_INT, rootProc, globalIntercomm);
2267 dimitri 1.1 }
2268    
2269    
2270    
2271    
2272    
2273     //////////////////////////////////////////////////////////////////////
2274     // Fortran interface
2275    
2276     void
2277     bron_f1(
2278     MPI_Fint *ptr_parentComm,
2279     int *ptr_numComputeRanks,
2280 dimitri 1.2 int *ptr_numTiles,
2281 dimitri 1.1 MPI_Fint *rtnComputeComm)
2282     {
2283     // Convert the Fortran handle into a C handle
2284     MPI_Comm newComm, parentComm = MPI_Comm_f2c(*ptr_parentComm);
2285    
2286 dimitri 1.2 f1(parentComm, *ptr_numComputeRanks, *ptr_numTiles, &newComm);
2287 dimitri 1.1
2288     // Convert the C handle into a Fortran handle
2289     *rtnComputeComm = MPI_Comm_c2f(newComm);
2290     }
2291    
2292    
2293    
2294     void
2295     bron_f2(int *ptr_tileID)
2296     {
2297     f2(*ptr_tileID);
2298     }
2299    
2300    
2301     void
2302     beginnewepoch_(int *newEpochID, int *gcmIter, int *epochStyle)
2303     {
2304     beginNewEpoch(*newEpochID, *gcmIter, *epochStyle);
2305     }
2306    
2307    
2308     void
2309     bron_f3(char *ptr_dataFieldID, int *ptr_tileID, int *ptr_epochID, void *data)
2310     {
2311     f3(*ptr_dataFieldID, *ptr_tileID, *ptr_epochID, data);
2312     }
2313    
2314    
2315    
2316     void
2317     bron_f4(int *ptr_epochID)
2318     {
2319     f4(*ptr_epochID);
2320     }
2321    
2322 dimitri 1.2

  ViewVC Help
Powered by ViewVC 1.1.22