Add a comment about connector.recv.
[sailfish:sailfish.git] / sailfish / block_runner.py
1 """Code for controlling a single block of a LB simulation."""
2
3 __author__ = 'Michal Januszewski'
4 __email__ = 'sailfish-cfd@googlegroups.com'
5 __license__ = 'LGPL3'
6
7 from collections import defaultdict, namedtuple
8 import math
9 import operator
10 import numpy as np
11 import zmq
12 from sailfish import codegen, util
13
14 # Used to hold a reference to a CUDA kernel and a grid on which it is
15 # to be executed.
16 KernelGrid = namedtuple('KernelGrid', 'kernel grid')
17 TimingInfo = namedtuple('TimingInfo', 'comp data recv send total block_id')
18
19
20 class ConnectionBuffer(object):
21     def __init__(self, face, cpair, coll_buf, coll_idx, recv_buf,
22             dist_partial_buf, dist_partial_idx, dist_partial_sel,
23             dist_full_buf, dist_full_idx):
24         self.face = face
25         self.cpair = cpair
26         self.coll_buf = coll_buf
27         self.coll_idx = coll_idx
28         self.recv_buf = recv_buf
29         self.dist_partial_buf = dist_partial_buf
30         self.dist_partial_idx = dist_partial_idx
31         self.dist_partial_sel = dist_partial_sel
32         self.dist_full_buf = dist_full_buf
33         self.dist_full_idx = dist_full_idx
34
35     def distribute(self, backend):
36         if self.dist_partial_sel is not None:
37             self.dist_partial_buf.host[:] = self.recv_buf[self.dist_partial_sel]
38             backend.to_buf(self.dist_partial_buf.gpu)
39
40         if self.cpair.dst.dst_slice:
41             slc = [slice(0, self.recv_buf.shape[0])] + list(reversed(self.cpair.dst.dst_full_buf_slice))
42             self.dist_full_buf.host[:] = self.recv_buf[slc]
43             backend.to_buf(self.dist_full_buf.gpu)
44
45 class GPUBuffer(object):
46     """Numpy array and a corresponding GPU buffer."""
47     def __init__(self, host_buffer, backend):
48         self.host = host_buffer
49         if host_buffer is not None:
50             self.gpu = backend.alloc_buf(like=host_buffer)
51         else:
52             self.gpu = None
53
54
55 class BlockRunner(object):
56     """Runs the simulation for a single LBBlock.
57     """
58     def __init__(self, simulation, block, output, backend, quit_event,
59             summary_addr=None):
60         # Create a 2-way connection between the LBBlock and this BlockRunner
61         self._ctx = zmq.Context()
62         if summary_addr is not None:
63             self._summary_sender = self._ctx.socket(zmq.REQ)
64             self._summary_sender.connect(summary_addr)
65         else:
66             self._summary_sender = None
67
68         self._block = block
69         block.runner = self
70
71         self._output = output
72         self.backend = backend
73
74         self._bcg = codegen.BlockCodeGenerator(simulation)
75         self._sim = simulation
76
77         if self._bcg.is_double_precision():
78             self.float = np.float64
79         else:
80             self.float = np.float32
81
82         self._scalar_fields = []
83         self._vector_fields = []
84         self._gpu_field_map = {}
85         self._gpu_grids_primary = []
86         self._gpu_grids_secondary = []
87         self._vis_map_cache = None
88         self._quit_event = quit_event
89
90         for b_id, connector in self._block._connectors.iteritems():
91             connector.init_runner(self._ctx)
92
93     @property
94     def config(self):
95         return self._sim.config
96
97     @property
98     def dim(self):
99         return self._block.dim
100
101     def update_context(self, ctx):
102         """Called by the codegen module."""
103         self._block.update_context(ctx)
104         self._geo_block.update_context(ctx)
105         ctx.update(self.backend.get_defines())
106
107         # Size of the lattice.
108         ctx['lat_ny'] = self._lat_size[-2]
109         ctx['lat_nx'] = self._lat_size[-1]
110
111         # Actual size of the array, including any padding.
112         ctx['arr_nx'] = self._physical_size[-1]
113         ctx['arr_ny'] = self._physical_size[-2]
114
115         bnd_limits = list(self._block.actual_size[:])
116
117         if self.dim == 3:
118             ctx['lat_nz'] = self._lat_size[-3]
119             ctx['arr_nz'] = self._physical_size[-3]
120             periodic_z = int(self._block.periodic_z)
121         else:
122             ctx['lat_nz'] = 1
123             ctx['arr_nz'] = 1
124             periodic_z = 0
125             bnd_limits.append(1)
126
127         ctx['lat_linear'] = self.lat_linear
128         ctx['lat_linear_dist'] = self.lat_linear_dist
129
130         ctx['periodic_x'] = 0 #int(self._block.periodic_x)
131         ctx['periodic_y'] = 0 #int(self._block.periodic_y)
132         ctx['periodic_z'] = 0 #periodic_z
133         ctx['periodicity'] = [0, 0, 0]
134
135 #[int(self._block.periodic_x), int(self._block.periodic_y), periodic_z]
136
137         ctx['bnd_limits'] = bnd_limits
138         ctx['dist_size'] = self._get_nodes()
139         ctx['sim'] = self._sim
140         ctx['block'] = self._block
141
142         # FIXME Additional constants.
143         ctx['constants'] = []
144
145         arr_nx = self._physical_size[-1]
146         arr_ny = self._physical_size[-2]
147
148         ctx['pbc_offsets'] = [{-1:  self.config.lat_nx,
149                                 1: -self.config.lat_nx},
150                               {-1:  self.config.lat_ny * arr_nx,
151                                 1: -self.config.lat_ny * arr_nx}]
152
153         if self._block.dim == 3:
154             ctx['pbc_offsets'].append(
155                               {-1:  self.config.lat_nz * arr_ny * arr_nx,
156                                 1: -self.config.lat_nz * arr_ny * arr_nx})
157
158
159     def add_visualization_field(self, field_cb, name):
160         self._output.register_field(field_cb, name, visualization=True)
161
162     def make_scalar_field(self, dtype=None, name=None, register=True):
163         """Allocates a scalar NumPy array.
164
165         The array includes padding adjusted for the compute device (hidden from
166         the end user), as well as space for any ghost nodes.  The returned
167         object is a view into the underlying array that hides all ghost nodes.
168         Ghost nodes can still be accessed via the 'base' attribute of the
169         returned ndarray.
170
171         :param register: if True, the field will be registered for output and
172             for automated creation of equivalent field on the compute device.
173         """
174         if dtype is None:
175             dtype = self.float
176
177         size = self._get_nodes()
178         strides = self._get_strides(dtype)
179
180         field = np.ndarray(self._physical_size, buffer=np.zeros(size, dtype=dtype),
181                            dtype=dtype, strides=strides)
182         fview = field[self._block._nonghost_slice]
183
184         if name is not None and register:
185             self._output.register_field(fview, name)
186
187         if register:
188             self._scalar_fields.append(fview)
189         return fview
190
191     def make_vector_field(self, name=None, output=False):
192         """Allocates several scalar arrays representing a vector field."""
193         components = []
194         view_components = []
195
196         for x in range(0, self._block.dim):
197             field = self.make_scalar_field(self.float, register=False)
198             components.append(field)
199
200         if name is not None:
201             self._output.register_field(components, name)
202
203         self._vector_fields.append(components)
204         return components
205
206     def visualization_map(self):
207         if self._vis_map_cache is None:
208             self._vis_map_cache = self._geo_block.visualization_map()
209         return self._vis_map_cache
210
211     def _init_geometry(self):
212         self.config.logger.debug("Initializing geometry.")
213         self._init_shape()
214         self._geo_block = self._sim.geo(self._global_size, self._block,
215                                         self._sim.grid)
216         self._geo_block.reset()
217
218     def _init_shape(self):
219         # Logical size of the lattice (including ghost nodes).
220         # X dimension is the last one on the list (nz, ny, nx)
221         self._lat_size = list(reversed(self._block.actual_size))
222
223         # Physical in-memory size of the lattice, adjusted for optimal memory
224         # access from the compute unit.  Size of the X dimension is rounded up
225         # to a multiple of block_size.  Order is [nz], ny, nx
226         self._physical_size = list(reversed(self._block.actual_size))
227         bs = self.config.block_size
228         self._physical_size[-1] = int(math.ceil(float(self._physical_size[-1]) / bs)) * bs
229
230         # CUDA block/grid size for standard kernel call.
231         if self._block.dim == 2:
232             self._kernel_grid_size = list(reversed(self._physical_size))
233         else:
234             self._kernel_grid_size = [self._physical_size[2] *
235                     self._physical_size[1],  self._physical_size[0]]
236         self._kernel_grid_size[0] /= self.config.block_size
237
238         self._kernel_block_size = (self.config.block_size, 1)
239
240         # Global grid size as seen by the simulation class.
241         if self._block.dim == 2:
242             self._global_size = (self.config.lat_ny, self.config.lat_nx)
243         else:
244             self._global_size = (self.config.lat_nz, self.config.lat_ny,
245                     self.config.lat_nx)
246
247         # Used so that face values map to the limiting coordinate
248         # along a specific axis, e.g. lat_linear[_X_LOW] = 0
249         # TODO(michalj): Should this use _block.envelope_size instead of -1?
250         self.lat_linear = [0, self._lat_size[-1]-1, 0, self._lat_size[-2]-1]
251         self.lat_linear_dist = [self._lat_size[-1]-2, 1,  self._lat_size[-2]-2, 1]
252
253         if self._block.dim == 3:
254             self.lat_linear.extend([0, self._lat_size[-3]-1])
255             self.lat_linear_dist.extend([self._lat_size[-3]-2, 1])
256
257     def _get_strides(self, type_):
258         """Returns a list of strides for the NumPy array storing the lattice."""
259         t = type_().nbytes
260         return list(reversed(reduce(lambda x, y: x + [x[-1] * y],
261                 self._physical_size[-1:0:-1], [t])))
262
263     def _get_nodes(self):
264         """Returns the total amount of actual nodes in the lattice."""
265         return reduce(operator.mul, self._physical_size)
266
267     def _get_dist_bytes(self, grid):
268         """Returns the number of bytes required to store a single set of
269            distributions for the whole simulation domain."""
270         return self._get_nodes() * grid.Q * self.float().nbytes
271
272     def _get_compute_code(self):
273         return self._bcg.get_code(self)
274
275     def _get_global_idx(self, location, dist_num):
276         if self.dim == 2:
277             gx, gy = location
278             arr_nx = self._physical_size[1]
279             return gx + arr_nx * gy + (self._get_nodes() * dist_num)
280         else:
281             gx, gy, gz = location
282             arr_nx = self._physical_size[2]
283             arr_ny = self._physical_size[1]
284             return ((gx + arr_nx * gy + arr_nx * arr_ny * gz) +
285                     (self._get_nodes() * dist_num))
286
287     def _idx_helper(self, gx, buf_slice, dists):
288         sel = [slice(0, len(dists))]
289         idx = np.mgrid[sel + list(reversed(buf_slice))].astype(np.uint32)
290         for i, dist_num in enumerate(dists):
291             idx[0][i,:] = dist_num
292         if self.dim == 2:
293             return self._get_global_idx((gx, idx[1]), idx[0]).astype(np.uint32)
294         else:
295             return self._get_global_idx((gx, idx[2], idx[1]),
296                     idx[0]).astype(np.uint32)
297
298     def _get_src_slice_indices(self, face, cpair):
299         if face in (self._block._X_LOW, self._block._X_HIGH):
300             gx = self.lat_linear[face]
301         else:
302             return None
303         return self._idx_helper(gx, cpair.src.src_slice, cpair.src.dists)
304
305     def _get_dst_slice_indices(self, face, cpair):
306         if not cpair.dst.dst_slice:
307             return None
308         if face in (self._block._X_LOW, self._block._X_HIGH):
309             gx = self.lat_linear_dist[self._block.opposite_face(face)]
310         else:
311             return None
312         es = self._block.envelope_size
313         dst_slice = [
314                 slice(x.start + es, x.stop + es) for x in
315                 cpair.dst.dst_slice]
316         return self._idx_helper(gx, dst_slice, cpair.dst.dists)
317
318     def _dst_face_loc_to_full_loc(self, face, face_loc):
319         axis = self._block.face_to_axis(face)
320         missing_loc = self.lat_linear_dist[self._block.opposite_face(face)]
321         if axis == 0:
322             return [missing_loc] + face_loc
323         elif axis == 1:
324             if self.dim == 2:
325                 return (face_loc[0], missing_loc)
326             else:
327                 return (face_loc[0], missing_loc, face_loc[1])
328         elif axis == 2:
329             return face_loc + [missing_loc]
330
331     def _get_partial_dst_indices(self, face, cpair):
332         if cpair.dst.partial_nodes == 0:
333             return None, None, None
334         buf = np.zeros(cpair.dst.partial_nodes, dtype=self.float)
335         idx = np.zeros(cpair.dst.partial_nodes, dtype=np.uint32)
336         dst_low = [x + self._block.envelope_size for x in cpair.dst.dst_low]
337         sel = []
338         i = 0
339         for dist_num, locations in sorted(cpair.dst.dst_partial_map.items()):
340             for loc in locations:
341                 dst_loc = [x + y for x, y in zip(dst_low, loc)]
342                 dst_loc = self._dst_face_loc_to_full_loc(face, dst_loc)
343
344                 # Reverse 'loc' here to go from natural order (x, y, z) to the
345                 # in-face buffer order z, y, x
346                 sel.append([cpair.dst.dists.index(dist_num)] + list(reversed(loc)))
347                 idx[i] = self._get_global_idx(dst_loc, dist_num)
348                 i += 1
349         sel2 = []
350         for i in range(0, len(sel[0])):
351             sel2.append([])
352
353         for loc in sel:
354             for i, coord in enumerate(loc):
355                 sel2[i].append(coord)
356
357         return buf, idx, sel2
358
359     def _init_buffers(self):
360         # TODO(michalj): Fix this for multi-grid models.
361         grid = util.get_grid_from_config(self.config)
362
363         # Maps block ID to a list of 1 or 2 ConnectionBuffer
364         # objects.  The list will contain 2 elements  only when
365         # global periodic boundary conditions are enabled).
366         self._block_to_connbuf = defaultdict(list)
367         for face, block_id in self._block.connecting_blocks():
368             cpair = self._block.get_connection(face, block_id)
369
370             # Buffers for collecting and sending information.
371             # TODO(michalj): Optimize this by providing proper padding.
372             coll_buf = np.zeros(cpair.src.transfer_shape, dtype=self.float)
373             coll_idx = self._get_src_slice_indices(face, cpair)
374
375             # Buffers for receiving and distributing information.
376             recv_buf = np.zeros(cpair.dst.transfer_shape, dtype=self.float)
377             # Any partial dists are serialized into a single continuous buffer.
378             dist_partial_buf, dist_partial_idx, dist_partial_sel = \
379                     self._get_partial_dst_indices(face, cpair)
380             dist_full_buf = np.zeros(cpair.dst.full_shape, dtype=self.float)
381             dist_full_idx = self._get_dst_slice_indices(face, cpair)
382
383             cbuf = ConnectionBuffer(face, cpair,
384                     GPUBuffer(coll_buf, self.backend),
385                     GPUBuffer(coll_idx, self.backend),
386                     recv_buf,
387                     GPUBuffer(dist_partial_buf, self.backend),
388                     GPUBuffer(dist_partial_idx, self.backend),
389                     dist_partial_sel,
390                     GPUBuffer(dist_full_buf, self.backend),
391                     GPUBuffer(dist_full_idx, self.backend))
392
393             self.config.logger.debug('adding buffer for conn: {0} -> {1} '
394                     '(face {2})'.format(self._block.id, block_id, face))
395             self._block_to_connbuf[block_id].append(cbuf)
396
397     def _init_compute(self):
398         self.config.logger.debug("Initializing compute unit.")
399         code = self._get_compute_code()
400         self.module = self.backend.build(code)
401
402
403         self._boundary_streams = (self.backend.make_stream(),
404                                   self.backend.make_stream())
405         self._bulk_stream = self.backend.make_stream()
406
407     def _init_gpu_data(self):
408         self.config.logger.debug("Initializing compute unit data.")
409
410         for field in self._scalar_fields:
411             self._gpu_field_map[id(field)] = self.backend.alloc_buf(like=field.base)
412
413         for field in self._vector_fields:
414             gpu_vector = []
415             for component in field:
416                 gpu_vector.append(self.backend.alloc_buf(like=component.base))
417             self._gpu_field_map[id(field)] = gpu_vector
418
419         for grid in self._sim.grids:
420             size = self._get_dist_bytes(grid)
421             self._gpu_grids_primary.append(self.backend.alloc_buf(size=size))
422             self._gpu_grids_secondary.append(self.backend.alloc_buf(size=size))
423
424         self._gpu_geo_map = self.backend.alloc_buf(
425                 like=self._geo_block.encoded_map())
426
427     def gpu_field(self, field):
428         """Returns the GPU copy of a field."""
429         return self._gpu_field_map[id(field)]
430
431     def gpu_dist(self, num, copy):
432         """Returns a GPU dist array."""
433         if copy == 0:
434             return self._gpu_grids_primary[num]
435         else:
436             return self._gpu_grids_secondary[num]
437
438     def gpu_geo_map(self):
439         return self._gpu_geo_map
440
441     def get_kernel(self, name, args, args_format, block_size=None):
442         if block_size is None:
443             block = self._kernel_block_size
444         else:
445             block = block_size
446         return self.backend.get_kernel(self.module, name, args=args,
447                 args_format=args_format, block=block)
448
449     def exec_kernel(self, name, args, args_format):
450         kernel = self.get_kernel(name, args, args_format)
451         self.backend.run_kernel(kernel, self._kernel_grid_size)
452
453     def step(self, output_req):
454         # call _step_boundary here
455         self._step_bulk(output_req)
456         self._sim.iteration += 1
457
458     def _step_bulk(self, output_req):
459         """Runs one simulation step in the bulk domain.
460
461         Bulk domain is defined to be all nodes that belong to CUDA
462         blocks that do not depend on input from any ghost nodes.
463         """
464         if output_req:
465             kernel = self._kernels_full[self._sim.iteration & 1]
466         else:
467             kernel = self._kernels_none[self._sim.iteration & 1]
468
469         # TODO(michalj): Do we need the sync here?
470         self.backend.sync()
471
472         self.backend.run_kernel(kernel, self._kernel_grid_size)
473
474         if self._sim.iteration & 1:
475             base = 0
476         else:
477             base = 3
478
479         # TODO(michalj): Do we need the sync here?
480         self.backend.sync()
481
482         if self._block.periodic_x:
483             kernel = self._pbc_kernels[base]
484             if self._block.dim == 2:
485                 grid_size = (
486                         int(math.ceil(self._lat_size[0] /
487                             float(self.config.block_size))), 1)
488             else:
489                 grid_size = (
490                         int(math.ceil(self._lat_size[1] /
491                             float(self.config.block_size))),
492                         self._lat_size[0])
493
494             self.backend.run_kernel(kernel, grid_size)
495
496         if self._block.periodic_y:
497             kernel = self._pbc_kernels[base + 1]
498             if self._block.dim == 2:
499                 grid_size = (
500                         int(math.ceil(self._lat_size[1] /
501                             float(self.config.block_size))), 1)
502             else:
503                 grid_size = (
504                         int(math.ceil(self._lat_size[2] /
505                             float(self.config.block_size))),
506                         self._lat_size[0])
507             self.backend.run_kernel(kernel, grid_size)
508
509         if self._block.dim == 3 and self._block.periodic_z:
510             kernel = self._pbc_kernels[base + 2]
511             grid_size = (
512                     int(math.ceil(self._lat_size[2] /
513                         float(self.config.block_size))),
514                     self._lat_size[1])
515             self.backend.run_kernel(kernel, grid_size)
516
517         for kernel, grid in self._collect_kernels[self._sim.iteration & 1]:
518             self.backend.run_kernel(kernel, grid)
519
520     def _step_boundary(self):
521         """Runs one simulation step for the boundary blocks.
522
523         Boundary blocks are CUDA blocks that depend on input from
524         ghost nodes."""
525
526         stream = self._boundary_stream[self._step]
527
528     def send_data(self):
529         for b_id, connector in self._block._connectors.iteritems():
530             conn_bufs = self._block_to_connbuf[b_id]
531             for x in conn_bufs:
532                 self.backend.from_buf(x.coll_buf.gpu)
533
534             if len(conn_bufs) > 1:
535                 connector.send(np.hstack(
536                     [np.ravel(x.coll_buf.host) for x in conn_bufs]))
537             else:
538                 connector.send(np.ravel(conn_bufs[0].coll_buf.host))
539
540     def recv_data(self):
541         for b_id, connector in self._block._connectors.iteritems():
542             conn_bufs = self._block_to_connbuf[b_id]
543             if len(conn_bufs) > 1:
544                 dest = np.hstack([np.ravel(x.recv_buf) for x in conn_bufs])
545                 # Returns false only if quit event is active.
546                 if not connector.recv(dest, self._quit_event):
547                     return
548                 i = 0
549                 # In case there are 2 connections between the blocks, reverse the
550                 # order of subbuffers in the recv buffer.  Note that this implicitly
551                 # assumes the order of conn_bufs is the same for both blocks.
552                 # TODO(michalj): Consider explicitly sorting conn_bufs.
553                 for cbuf in reversed(conn_bufs):
554                     l = cbuf.recv_buf.size
555                     cbuf.recv_buf[:] = dest[i:i+l].reshape(cbuf.recv_buf.shape)
556                     i += l
557                     cbuf.distribute(self.backend)
558             else:
559                 cbuf = conn_bufs[0]
560                 dest = np.ravel(cbuf.recv_buf)
561                 # Returns false only if quit event is active.
562                 if not connector.recv(dest, self._quit_event):
563                     return
564                 # If ravel returned a copy, we need to write the data
565                 # back to the proper buffer.
566                 # TODO(michalj): Check if there is any way of avoiding this
567                 # copy.
568                 if dest.flags.owndata:
569                     cbuf.recv_buf[:] = dest.reshape(cbuf.recv_buf.shape)
570                 cbuf.distribute(self.backend)
571
572     def _fields_to_host(self):
573         """Copies data for all fields from the GPU to the host."""
574         for field in self._scalar_fields:
575             self.backend.from_buf(self._gpu_field_map[id(field)])
576
577         for field in self._vector_fields:
578             for component in self._gpu_field_map[id(field)]:
579                 self.backend.from_buf(component)
580
581     def _init_interblock_kernels(self):
582         # TODO(michalj): Extend this for multi-grid models.
583
584         collect_primary = []
585         collect_secondary = []
586
587         distrib_primary = []
588         distrib_secondary = []
589         self._distrib_grid = []
590
591         collect_block = 32
592         def _grid_dim1(x):
593             return int(math.ceil(x / float(collect_block)))
594
595         for b_id, conn_bufs in self._block_to_connbuf.iteritems():
596             for cbuf in conn_bufs:
597                 # Data collection.
598                 if cbuf.coll_idx.host is not None:
599                     grid_size = (_grid_dim1(cbuf.coll_buf.host.size),)
600
601                     def _get_sparse_coll_kernel(i):
602                         return KernelGrid(
603                             self.get_kernel('CollectSparseData',
604                             [cbuf.coll_idx.gpu, self.gpu_dist(0, i),
605                              cbuf.coll_buf.gpu, cbuf.coll_buf.host.size],
606                             'PPPi', (collect_block,)),
607                             grid_size)
608
609                     collect_primary.append(_get_sparse_coll_kernel(1))
610                     collect_secondary.append(_get_sparse_coll_kernel(0))
611                 else:
612                     # [X, Z * dists] or [X, Y * dists]
613                     min_max = ([x.start for x in cbuf.cpair.src.src_slice] +
614                             list(reversed(cbuf.coll_buf.host.shape[1:])))
615                     min_max[-1] = min_max[-1] * len(cbuf.cpair.src.dists)
616                     if self.dim == 2:
617                         signature = 'PiiiP'
618                         grid_size = (_grid_dim1(cbuf.coll_buf.host.size),)
619                     else:
620                         signature = 'PiiiiiP'
621                         grid_size = (_grid_dim1(cbuf.coll_buf.host.shape[-1]),
622                             cbuf.coll_buf.host.shape[-2] * len(cbuf.cpair.src.dists))
623
624                     def _get_cont_coll_kernel(i):
625                         return KernelGrid(
626                             self.get_kernel('CollectContinuousData',
627                             [self.gpu_dist(0, i),
628                              cbuf.face] + min_max + [cbuf.coll_buf.gpu],
629                              signature, (collect_block,)),
630                              grid_size)
631
632                     collect_primary.append(_get_cont_coll_kernel(1))
633                     collect_secondary.append(_get_cont_coll_kernel(0))
634
635                 # Data distribution
636                 # Partial nodes.
637                 if cbuf.dist_partial_idx.host is not None:
638                     grid_size = (_grid_dim1(cbuf.dist_partial_buf.host.size),)
639
640                     def _get_sparse_dist_kernel(i):
641                         return KernelGrid(
642                                 self.get_kernel('DistributeSparseData',
643                                     [cbuf.dist_partial_idx.gpu,
644                                      self.gpu_dist(0, i),
645                                      cbuf.dist_partial_buf.gpu,
646                                      cbuf.dist_partial_buf.host.size],
647                                     'PPPi', (collect_block,)),
648                                 grid_size)
649
650                     distrib_primary.append(_get_sparse_dist_kernel(0))
651                     distrib_secondary.append(_get_sparse_dist_kernel(1))
652
653                 # Full nodes (all transfer distributions).
654                 if cbuf.dist_full_buf.host is not None:
655                     # Sparse indexing (connection along X-axis).
656                     if cbuf.dist_full_idx.host is not None:
657                         grid_size = (_grid_dim1(cbuf.dist_full_buf.host.size),)
658
659                         def _get_sparse_fdist_kernel(i):
660                             return KernelGrid(
661                                     self.get_kernel('DistributeSparseData',
662                                         [cbuf.dist_full_idx.gpu,
663                                          self.gpu_dist(0, i),
664                                          cbuf.dist_full_buf.gpu,
665                                          cbuf.dist_full_buf.host.size],
666                                     'PPPi', (collect_block,)),
667                                     grid_size)
668
669                         distrib_primary.append(_get_sparse_fdist_kernel(0))
670                         distrib_secondary.append(_get_sparse_fdist_kernel(1))
671                     # Continuous indexing.
672                     elif cbuf.cpair.dst.dst_slice:
673                         # [X, Z * dists] or [X, Y * dists]
674                         low = [x + self._block.envelope_size for x in cbuf.cpair.dst.dst_low]
675                         min_max = ([(x + y.start) for x, y in zip(low, cbuf.cpair.dst.dst_slice)] +
676                                 list(reversed(cbuf.dist_full_buf.host.shape[1:])))
677                         min_max[-1] = min_max[-1] * len(cbuf.cpair.dst.dists)
678
679                         if self.dim == 2:
680                             signature = 'PiiiP'
681                             grid_size = (_grid_dim1(cbuf.dist_full_buf.host.size),)
682                         else:
683                             signature = 'PiiiiiP'
684                             grid_size = (_grid_dim1(cbuf.dist_full_buf.host.shape[-1]),
685                                 cbuf.dist_full_buf.host.shape[-2] * len(cbuf.cpair.dst.dists))
686
687                         def _get_cont_dist_kernel(i):
688                             return KernelGrid(
689                                     self.get_kernel('DistributeContinuousData',
690                                     [self.gpu_dist(0, i),
691                                      self._block.opposite_face(cbuf.face)] +
692                                     min_max + [cbuf.dist_full_buf.gpu],
693                                     signature, (collect_block,)),
694                                     grid_size)
695
696                         distrib_primary.append(_get_cont_dist_kernel(0))
697                         distrib_secondary.append(_get_cont_dist_kernel(1))
698
699         self._collect_kernels = (collect_primary, collect_secondary)
700         self._distrib_kernels = (distrib_primary, distrib_secondary)
701
702     def _debug_get_dist(self, output=True):
703         """Copies the distributions from the GPU to a properly structured host array.
704         :param output: if True, returns the contents of the distributions set *after*
705                 the current simulation step
706         """
707         iter_idx = self._sim.iteration & 1
708         if not output:
709             iter_idx = 1 - iter_idx
710
711         self.config.logger.debug('getting dist {0} ({1})'.format(iter_idx,
712             self.gpu_dist(0, iter_idx)))
713         dbuf = np.zeros(self._get_dist_bytes(self._sim.grids[0]) / self.float().nbytes,
714             dtype=self.float)
715         self.backend.from_buf(self.gpu_dist(0, iter_idx), dbuf)
716         dbuf = dbuf.reshape([self._sim.grids[0].Q] + self._physical_size)
717         return dbuf
718
719     def _debug_set_dist(self, dbuf, output=True):
720         iter_idx = self._sim.iteration & 1
721         if not output:
722             iter_idx = 1 - iter_idx
723
724         self.backend.to_buf(self.gpu_dist(0, iter_idx), dbuf)
725
726     def _debug_global_idx_to_tuple(self, gi):
727         dist_num = gi / self._get_nodes()
728         rest = gi % self._get_nodes()
729         arr_nx = self._physical_size[-1]
730         gx = rest % arr_nx
731         gy = rest / arr_nx
732         return dist_num, gy, gx
733
734     def run(self):
735         self.config.logger.info("Initializing block.")
736
737         self._init_geometry()
738         self._init_buffers()
739         self._init_compute()
740         self.config.logger.debug("Initializing macroscopic fields.")
741         self._sim.init_fields(self)
742         self._geo_block.init_fields(self._sim)
743         self._init_gpu_data()
744         self.config.logger.debug("Applying initial conditions.")
745         self._sim.initial_conditions(self)
746
747         self._init_interblock_kernels()
748         self._kernels_full = self._sim.get_compute_kernels(self, True)
749         self._kernels_none = self._sim.get_compute_kernels(self, False)
750         self._pbc_kernels = self._sim.get_pbc_kernels(self)
751
752         if self.config.output:
753             self._output.save(self._sim.iteration)
754
755         self.config.logger.info("Starting simulation.")
756
757         if not self.config.max_iters:
758             self.config.logger.warning("Running infinite simulation.")
759
760         if self.config.mode == 'benchmark':
761             self.main_benchmark()
762         else:
763             self.main()
764
765         self.config.logger.info(
766             "Simulation completed after {0} iterations.".format(
767                 self._sim.iteration))
768
769     def main(self):
770         while True:
771             output_req = ((self._sim.iteration + 1) % self.config.every) == 0
772
773             if output_req and self.config.debug_dump_dists:
774                 dbuf = self._debug_get_dist(self)
775                 self._output.dump_dists(dbuf, self._sim.iteration)
776
777             self.step(output_req)
778             self.send_data()
779
780             if output_req and self.config.output_required:
781                 self._fields_to_host()
782                 self._output.save(self._sim.iteration)
783
784             if (self.config.max_iters > 0 and self._sim.iteration >=
785                     self.config.max_iters):
786                 break
787
788             if self._quit_event.is_set():
789                 self.config.logger.info("Simulation termination requested.")
790                 break
791
792             self.recv_data()
793             if self._quit_event.is_set():
794                 self.config.logger.info("Simulation termination requested.")
795
796             for kernel, grid in self._distrib_kernels[self._sim.iteration & 1]:
797                 self.backend.run_kernel(kernel, grid)
798
799     def main_benchmark(self):
800         t_comp = 0.0
801         t_total = 0.0
802         t_send = 0.0
803         t_recv = 0.0
804         t_data = 0.0
805
806         import time
807
808         for i in xrange(self.config.max_iters):
809             output_req = ((self._sim.iteration + 1) % self.config.every) == 0
810
811             t1 = time.time()
812             self.step(output_req)
813             t2 = time.time()
814
815             self.send_data()
816
817             t3 = time.time()
818             if output_req:
819                 self._fields_to_host()
820             t4 = time.time()
821
822             self.recv_data()
823
824             for kernel, grid in self._distrib_kernels[self._sim.iteration & 1]:
825                 self.backend.run_kernel(kernel, grid)
826
827             t5 = time.time()
828
829             t_comp += t2 - t1
830             t_total += t5 - t1
831             t_recv += t5 - t4
832             t_send += t3 - t2
833             t_data += t4 - t3
834
835             if output_req:
836                 mlups_base = self._sim.iteration * reduce(operator.mul,
837                              self._lat_size)
838                 mlups_total = mlups_base / t_total * 1e-6
839                 mlups_comp = mlups_base / t_comp * 1e-6
840                 self.config.logger.info(
841                         'MLUPS eff:{0:.2f}  comp:{1:.2f}  overhead:{2:.3f}'.format(
842                             mlups_total, mlups_comp, t_total / t_comp - 1.0))
843
844                 j = self._sim.iteration
845                 self.config.logger.debug(
846                         'time comp:{0:e}  data:{1:e}  recv:{2:e}  send:{3:e}'
847                         '  total:{4:e}'.format(
848                             t_comp / j, t_data / j, t_recv / j, t_send / j,
849                             t_total / j))
850
851         mi = self.config.max_iters
852         ti = TimingInfo(t_comp / mi, t_data / mi, t_recv / mi, t_send / mi,
853                 t_total / mi, self._block.id)
854         if self._summary_sender is not None:
855             self._summary_sender.send_pyobj(ti)
856             self.config.logger.debug('Sending timing information to controller.')
857             assert self._summary_sender.recv_pyobj() == 'ack'