topslave.F



[checkzpartbnd] [parallel_sum_mmnts] [parallel_sum_temperature] [parallelbarrier] [parallelbroadcastrealarray] [parallellor] [parallelmaxintegerarray] [parallelmaxrealarray] [parallelmaxrealarraycomm] [parallelminrealarray] [parallelminrealarraycomm] [parallelnonzerorealarray] [parallelsumintegerarray] [parallelsumrealarray] [parallelsumrealarraycomm] [particleboundaries_parallel] [reorgparticles_parallel]

#include top.h
  TOPSLAVE.M, version $Revision: 1.69 $, $Date: 2011/04/28 13:37:14 $
  Slave routines related to top package.
  D. P. Grote

[xparticleboundaries] [yparticleboundaries] [zparticleboundaries]
      subroutine particleboundaries_parallel(axis,pgroup,js1,js2,
     &                                       zmmax,zmmin,
     &                                       zpmaxlocal,zpminlocal,
     &                                       pboundleft,pboundright,
     &                                       lcountaslost,labs,lrz)
      use ParticleGroupmodule
      use Subtimerstop
      use GlobalVars
      use InGen,Only: clearlostpart
      use InPart
      use Parallel
      use Databuffers
      use Subcycling, Only: ndtstorho,zgridndts
      use DebugFlags
      integer(ISZ):: axis
      type(ParticleGroup):: pgroup
      integer(ISZ):: js1,js2
      real(kind=8):: zmmax,zmmin
      real(kind=8):: zpmaxlocal,zpminlocal
      integer(ISZ):: pboundleft,pboundright
      logical(ISZ):: lcountaslost,labs,lrz

  Impose boundary conditions baesd on zz. zz can be any coordinate.
  Note that zz must be on of the arrays in pgroup so that the rearranging
  below will affect it.
  axis is the axis of zz. 0 is x, 1 is y, 2 is z.
  Puts particles that exit to the left lower in the arrays and particles
  that exit to the right higher.  Keeps track of how many left and sends
  to the appropriate processor.

      logical(ISZ):: ldoagain
      logical(ISZ):: isleftmostproc,isrightmostproc
      integer(ISZ):: ilower,ihigher,iter
      real(kind=8),pointer:: zz(:),uz(:),zbuff(:)
      real(kind=8):: ztemp,temp,pidtemp(pgroup%npid),syslen,zgrid,rr,rrin
      integer(ISZ):: is1,is2,is,ip,idest,ipid,ii,il,ir,nn,indts,i1,i2,iii13
      integer(MPIISZ):: ins_init(pgroup%ns),nps_init(pgroup%ns)
      integer(MPIISZ):: nsendleft(pgroup%ns),nsendright(pgroup%ns)
      integer(MPIISZ):: nrecvleft(pgroup%ns),nrecvright(pgroup%ns)
      integer(MPIISZ):: nsendleftsum,nsendrightsum,nrecvleftsum,nrecvrightsum
      integer(MPIISZ):: left_pe,right_pe
      integer(MPIISZ):: nquant,ierr

      include "mpif.h"
      integer(MPIISZ):: mpierror,mpistatus(MPI_STATUS_SIZE)
      integer(MPIISZ):: comm_world_mpiisz
      integer(MPIISZ),allocatable:: blocklengths(:),displacements(:)
      integer(MPIISZ):: mpinewtype,mpirequest
      integer(MPIISZ):: messid = 20

      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world

      --- If this dimension has zero length, then skip the boundary
      --- conditions since this will be an ignorable dimension.
      if (zmmin == zmmax) return
      if (zpminlocal == zpmaxlocal) return

#ifdef VAMPIR
      call VTSYMDEF(10, "ZBND", "SORT", IERR)
      call VTSYMDEF(20, "ZBND", "CPY1", IERR)
      call VTSYMDEF(30, "ZBND", "SNDN", IERR)
      call VTSYMDEF(40, "ZBND", "CHCK", IERR)
      call VTSYMDEF(50, "ZBND", "SNDP", IERR)
      call VTSYMDEF(60, "ZBND", "CPY2", IERR)
      call VTSYMDEF(70, "ZBND", "PLOR", IERR)
      call VTBEGIN(10,IERR)
#endif

      is1 = js1 + 1
      is2 = js2 + 1

      --- Get the neighboring processors
      left_pe = procneighbors(0,axis)
      right_pe = procneighbors(1,axis)
      isleftmostproc = (iprocgrid(axis) == 0)
      isrightmostproc = (iprocgrid(axis) == nprocgrid(axis) - 1)

      iter = 0
      ldoagain = .true.
      do while (ldoagain)
        --- Set ldoagain flag to false. Later on, particles received are checked
        --- if they are within the bounds and if any are not, ldoagain is set
        --- to true.
        ldoagain = .false.

        --- Quit if more iterations happen than the number of processors. This
        --- means that one or more particles are in some strange location
        --- that no domain covers.
        iter = iter + 1
        if (iter == nprocgrid(axis)+2) then
          call kaboom("particleboundaries_parallel: the domain for some particles cannot be found")
          return
        endif

        --- Setup pointer to correct data coordinates
        --- This must be done inside the while loop in case a gchange is
        --- done below (which would make the pointer references obsolete)
        if (axis == 0) then
          zz => pgroup%xp
          uz => pgroup%uxp
        else if (axis == 1) then
          zz => pgroup%yp
          uz => pgroup%uyp
        else if (axis == 2) then
          zz => pgroup%zp
          uz => pgroup%uzp
        endif

        --- Loop over species
        do is=is1,is2
          if (axis == 2) then
            indts = ndtstorho(pgroup%ndts(is-1))
            zgrid = zgridndts(indts)
          else
            zgrid = 0.
          endif

          --- initialize counter and save particle's start and number
          ip = pgroup%ins(is)
          ins_init(is) = pgroup%ins(is)
          nps_init(is) = pgroup%nps(is)

          --- Loop over particles and pick out the ones which have ventured
          --- into the regions of neighboring processors.
          do while (ip < pgroup%ins(is) + pgroup%nps(is))

            if (lrz) then
              ztemp = sqrt(pgroup%xp(ip)**2 + pgroup%yp(ip)**2)
            else if (labs) then
              ztemp = abs(zz(ip) - zgrid)
            else
              ztemp = zz(ip) - zgrid
            endif

            --- If particle exited to the left, switch with lowest particle
            --- in the arrays.
            if ((ztemp <= zpminlocal .and. pboundleft == absorb) .or.
     &           ztemp < zpminlocal) then
              idest = pgroup%ins(is)
              pgroup%ins(is) = pgroup%ins(is) + 1
              pgroup%nps(is) = pgroup%nps(is) - 1

              temp = pgroup%xp(ip); pgroup%xp(ip) = pgroup%xp(idest); pgroup%xp(idest) = temp
              temp = pgroup%yp(ip); pgroup%yp(ip) = pgroup%yp(idest); pgroup%yp(idest) = temp
              temp = pgroup%zp(ip); pgroup%zp(ip) = pgroup%zp(idest); pgroup%zp(idest) = temp

              temp = pgroup%uxp(ip); pgroup%uxp(ip) = pgroup%uxp(idest); pgroup%uxp(idest) = temp
              temp = pgroup%uyp(ip); pgroup%uyp(ip) = pgroup%uyp(idest); pgroup%uyp(idest) = temp
              temp = pgroup%uzp(ip); pgroup%uzp(ip) = pgroup%uzp(idest); pgroup%uzp(idest) = temp

              temp = pgroup%gaminv(ip); pgroup%gaminv(ip) = pgroup%gaminv(idest); pgroup%gaminv(idest) = temp

              temp = pgroup%ex(ip); pgroup%ex(ip) = pgroup%ex(idest); pgroup%ex(idest) = temp
              temp = pgroup%ey(ip); pgroup%ey(ip) = pgroup%ey(idest); pgroup%ey(idest) = temp
              temp = pgroup%ez(ip); pgroup%ez(ip) = pgroup%ez(idest); pgroup%ez(idest) = temp

              temp = pgroup%bx(ip); pgroup%bx(ip) = pgroup%bx(idest); pgroup%bx(idest) = temp
              temp = pgroup%by(ip); pgroup%by(ip) = pgroup%by(idest); pgroup%by(idest) = temp
              temp = pgroup%bz(ip); pgroup%bz(ip) = pgroup%bz(idest); pgroup%bz(idest) = temp

              if (pgroup%npid > 0) then
                pidtemp = pgroup%pid(ip,:)
                pgroup%pid(ip,:) = pgroup%pid(idest,:)
                pgroup%pid(idest,:) = pidtemp
              endif

            --- If particle exited to the right, switch with highest particle
            --- in the arrays.
            elseif (ztemp >= zpmaxlocal) then
              idest = pgroup%ins(is) + pgroup%nps(is) - 1
              pgroup%nps(is) = pgroup%nps(is) - 1

              temp = pgroup%xp(ip); pgroup%xp(ip) = pgroup%xp(idest); pgroup%xp(idest) = temp
              temp = pgroup%yp(ip); pgroup%yp(ip) = pgroup%yp(idest); pgroup%yp(idest) = temp
              temp = pgroup%zp(ip); pgroup%zp(ip) = pgroup%zp(idest); pgroup%zp(idest) = temp

              temp = pgroup%uxp(ip); pgroup%uxp(ip) = pgroup%uxp(idest); pgroup%uxp(idest) = temp
              temp = pgroup%uyp(ip); pgroup%uyp(ip) = pgroup%uyp(idest); pgroup%uyp(idest) = temp
              temp = pgroup%uzp(ip); pgroup%uzp(ip) = pgroup%uzp(idest); pgroup%uzp(idest) = temp

              temp = pgroup%gaminv(ip); pgroup%gaminv(ip) = pgroup%gaminv(idest); pgroup%gaminv(idest) = temp

              temp = pgroup%ex(ip); pgroup%ex(ip) = pgroup%ex(idest); pgroup%ex(idest) = temp
              temp = pgroup%ey(ip); pgroup%ey(ip) = pgroup%ey(idest); pgroup%ey(idest) = temp
              temp = pgroup%ez(ip); pgroup%ez(ip) = pgroup%ez(idest); pgroup%ez(idest) = temp

              temp = pgroup%bx(ip); pgroup%bx(ip) = pgroup%bx(idest); pgroup%bx(idest) = temp
              temp = pgroup%by(ip); pgroup%by(ip) = pgroup%by(idest); pgroup%by(idest) = temp
              temp = pgroup%bz(ip); pgroup%bz(ip) = pgroup%bz(idest); pgroup%bz(idest) = temp

              if (pgroup%npid > 0) then
                pidtemp = pgroup%pid(ip,:)
                pgroup%pid(ip,:) = pgroup%pid(idest,:)
                pgroup%pid(idest,:) = pidtemp
              endif
              ip = ip - 1
            endif

            --- advance ip
            ip = ip + 1
          enddo

          --- Apply appropriate boundary conditions on particles at the axial
          --- ends of the full mesh.

          i1 = ins_init(is)
          i2 = pgroup%ins(is) - 1
          if (isleftmostproc .and. i2 >= i1) then

            if (debug) then
              --- Check if a particle is outside of the particle grid but
              --- still within the field grid. This should never happen.
              do ip=i1,i2
                if (lrz) then
                  ztemp = sqrt(pgroup%xp(ip)**2 + pgroup%yp(ip)**2)
                else if (labs) then
                  ztemp = abs(zz(ip) - zgrid)
                else
                  ztemp = zz(ip) - zgrid
                endif

                if (zmmin <= ztemp .and. ztemp < zpminlocal) then
                  if (axis == 0) then
                    call kaboom("particleboundaries_parallel: there is
     &something wrong with the loadbalancing - a particle was found between
     &xmmin and xpminlocal")
                  else if (axis == 1) then
                    call kaboom("particleboundaries_parallel: there is
     &something wrong with the loadbalancing - a particle was found between
     &ymmin and ypminlocal")
                  else if (axis == 2) then
                    call kaboom("particleboundaries_parallel: there is
     &something wrong with the loadbalancing - a particle was found between
     &zmmin and zpminlocal")
                  endif
                endif
              enddo
            endif

            if (pboundleft==periodic) then
              --- The position is adjusted, and the data will still be passed
              --- to the appropriate neighbor.
              syslen = zmmax - zmmin
              do ip=i1,i2
                zz(ip) = zz(ip) + syslen*int((zmmax - (zz(ip) - zgrid))/syslen)
              enddo

            elseif (pboundleft==absorb) then
              --- The gaminv is flagged as a lost particle. The ins and nps
              --- counts are reset to include the lost particles so that they
              --- will be handled properly.
              if (lcountaslost) then
                pgroup%gaminv(i1:i2) = 0.
              else
                pgroup%gaminv(i1:i2) = -1.
              endif
              pgroup%nps(is) = pgroup%nps(is) + (pgroup%ins(is)-ins_init(is))
              pgroup%ins(is) = ins_init(is)

            elseif (pboundleft==reflect) then
              --- The position is adjusted, but the data is not sent.
              --- The ins and nps are reset to include them.
              --- Note that the lrz case is not needed, since r will never
              --- be less than zero.
              do ip=i1,i2
                zz(ip) = zmmin + (zmmin - (zz(ip) - zgrid)) + zgrid
                uz(ip) = -uz(ip)
              enddo
              pgroup%nps(is) = pgroup%nps(is) + (pgroup%ins(is) - ins_init(is))
              pgroup%ins(is) = ins_init(is)
            endif

          endif

          i1 = pgroup%ins(is) + pgroup%nps(is)
          i2 = ins_init(is) + nps_init(is) - 1
          if (isrightmostproc .and. i2 >= i1) then

            if (debug) then
              --- Check if a particle is outside of the particle grid but
              --- still within the field grid. This should never happen.
              do ip=i1,i2
                if (lrz) then
                  ztemp = sqrt(pgroup%xp(ip)**2 + pgroup%yp(ip)**2)
                else if (labs) then
                  ztemp = abs(zz(ip) - zgrid)
                else
                  ztemp = zz(ip) - zgrid
                endif

                if (zpmaxlocal < ztemp .and. ztemp <= zmmax) then
                  if (axis == 0) then
                    call kaboom("particleboundaries_parallel: there is
     &something wrong with the loadbalancing - a particle was found between
     &xmmax and xpmaxlocal")
                  else if (axis == 1) then
                    call kaboom("particleboundaries_parallel: there is
     &something wrong with the loadbalancing - a particle was found between
     &ymmax and ypmaxlocal")
                  else if (axis == 2) then
                    call kaboom("particleboundaries_parallel: there is
     &something wrong with the loadbalancing - a particle was found between
     &zmmax and zpmaxlocal")
                  endif
                endif
              enddo
            endif

            if (pboundright==periodic) then
              --- The position is adjusted, and the data will still be passed
              --- to the appropriate neighbor.
              syslen = zmmax - zmmin
              do ip=i1,i2
                zz(ip) = zz(ip) - syslen*int(((zz(ip) - zgrid) - zmmin)/syslen)
              enddo

            elseif (pboundright==absorb) then
              --- The gaminv is flagged as a lost particle. The ins and nps
              --- counts are reset to include the lost particles so that they
              --- will be handled properly. Check if there are any flagged
              --- particles.
              if (lcountaslost) then
                pgroup%gaminv(i1:i2) = 0.
              else
                pgroup%gaminv(i1:i2) = -1.
              endif
              pgroup%nps(is) = ins_init(is) + nps_init(is) - pgroup%ins(is)

            elseif (pboundright==reflect) then
              --- The position is adjusted, but the data is not sent.
              --- zmmin and zmmax are actually xmmin and xmmax here.
              --- The ins and nps are reset to include them.
              if (lrz) then
                do ip=i1,i2
                  rr = sqrt(pgroup%xp(ip)**2 + pgroup%yp(ip)**2)
                  if (rr > zmmax) then
                    pgroup%xp(ip) = (2.*zmmax - rr)*(pgroup%xp(ip)/rr)
                    pgroup%yp(ip) = (2.*zmmax - rr)*(pgroup%yp(ip)/rr)
                  else ! rr == zmmax
                    --- Shift the particle in bounds by a number that is small
                    --- compared to the dimensions of the grid.
                    rrin = zmmax - (zmmax - zmmin)*1.e-12
                    pgroup%xp(ip) = rrin*(pgroup%xp(ip)/rr)
                    pgroup%yp(ip) = rrin*(pgroup%yp(ip)/rr)
                  endif
                  pgroup%uxp(ip) = -pgroup%uxp(ip)
                  pgroup%uyp(ip) = -pgroup%uyp(ip)
                enddo
              else
                do ip=i1,i2
                  if (zz(ip) > zmmax) then
                    zz(ip) = zmmax - ((zz(ip) - zgrid) - zmmax) + zgrid
                  else ! zz == zmmax
                    --- Shift the particle in bounds by a number that is small
                    --- compared to the dimensions of the grid.
                    zz(ip) = zmmax - (zmmax - zmmin)*1.e-12 + zgrid
                  endif
                  uz(ip) = -uz(ip)
                enddo
              endif
              pgroup%nps(is) = ins_init(is) + nps_init(is) - pgroup%ins(is)

            endif
          endif

        --- End of loop over species
        enddo

#ifdef VAMPIR
      call VTEND(10,IERR)
      call VTBEGIN(20,IERR)
#endif

  Here is what happens...
   First, particle data to be sent is copied into temporary buffers.
   Second, exchange number of particles to be sent in each direction.
   Third, make sure there is room in particle arrays for incoming data.
   Fourth, For each direction, create an mpi type which which has the
           explicit addresses of the places where the particle data goes.
   Fifth, exchange the data.
 
  Step four is done so that the incoming data does not have to be buffered,
  but can be received directly into the correct memory locations.

        --- Make sure these are zero in case they are not set below.
        nsendleft = 0
        nsendright = 0
        nsendleftsum = 0
        nsendrightsum = 0

        --- Number of quantities to be exchanged. Normally it is thirteen,
        --- the positions, velocities, gaminv, and the fields
        --- but when pgroup%npidɬ, then it is more since pid is also exchanged.
        nquant = 13
        if (pgroup%npid > 0) nquant = nquant + pgroup%npid

        --- Copy particles being sent to the left to buffer1.
        if (pboundleft==periodic .or. .not. isleftmostproc) then
          nsendleft(is1:is2) = pgroup%ins(is1:is2) - ins_init(is1:is2)
          nsendleftsum = sum(nsendleft(is1:is2))
          if (nquant*nsendleftsum > b1size) then
            b1size = nquant*nsendleftsum
            call gchange("Databuffers",0)
          endif
          if (nsendleftsum > 0) then
            ii = 1
            do is=is1,is2
              ip = ins_init(is)
              nn = nsendleft(is)
              if (nn > 0) then
                buffer1(ii      :ii+   nn-1) = pgroup%xp(ip:ip+nn-1)
                buffer1(ii+   nn:ii+ 2*nn-1) = pgroup%yp(ip:ip+nn-1)
                buffer1(ii+ 2*nn:ii+ 3*nn-1) = pgroup%zp(ip:ip+nn-1)
                buffer1(ii+ 3*nn:ii+ 4*nn-1) = pgroup%uxp(ip:ip+nn-1)
                buffer1(ii+ 4*nn:ii+ 5*nn-1) = pgroup%uyp(ip:ip+nn-1)
                buffer1(ii+ 5*nn:ii+ 6*nn-1) = pgroup%uzp(ip:ip+nn-1)
                buffer1(ii+ 6*nn:ii+ 7*nn-1) = pgroup%gaminv(ip:ip+nn-1)
                buffer1(ii+ 7*nn:ii+ 8*nn-1) = pgroup%ex(ip:ip+nn-1)
                buffer1(ii+ 8*nn:ii+ 9*nn-1) = pgroup%ey(ip:ip+nn-1)
                buffer1(ii+ 9*nn:ii+10*nn-1) = pgroup%ez(ip:ip+nn-1)
                buffer1(ii+10*nn:ii+11*nn-1) = pgroup%bx(ip:ip+nn-1)
                buffer1(ii+11*nn:ii+12*nn-1) = pgroup%by(ip:ip+nn-1)
                buffer1(ii+12*nn:ii+13*nn-1) = pgroup%bz(ip:ip+nn-1)
                do ipid=1,pgroup%npid
                  buffer1(ii+(12+ipid)*nn:ii+(13+ipid)*nn-1) = pgroup%pid(ip:ip+nn-1,ipid)
                enddo
                ii = ii + nn*nquant
              endif
            enddo
          endif
        endif

        --- Copy particles being sent to the right to buffer2.
        if (pboundright==periodic .or. .not. isrightmostproc) then
          nsendright(is1:is2) = ins_init(is1:is2) + nps_init(is1:is2)
     &                          - pgroup%ins(is1:is2) - pgroup%nps(is1:is2)
          nsendrightsum = sum(nsendright(is1:is2))
          if (nquant*nsendrightsum > b2size) then
            b2size = nquant*nsendrightsum
            call gchange("Databuffers",0)
          endif
          if (nsendrightsum > 0) then
            ii = 1
            do is=is1,is2
              ip = pgroup%ins(is) + pgroup%nps(is)
              nn = nsendright(is)
              if (nn > 0) then
                buffer2(ii      :ii+   nn-1) = pgroup%xp(ip:ip+nn-1)
                buffer2(ii+   nn:ii+ 2*nn-1) = pgroup%yp(ip:ip+nn-1)
                buffer2(ii+ 2*nn:ii+ 3*nn-1) = pgroup%zp(ip:ip+nn-1)
                buffer2(ii+ 3*nn:ii+ 4*nn-1) = pgroup%uxp(ip:ip+nn-1)
                buffer2(ii+ 4*nn:ii+ 5*nn-1) = pgroup%uyp(ip:ip+nn-1)
                buffer2(ii+ 5*nn:ii+ 6*nn-1) = pgroup%uzp(ip:ip+nn-1)
                buffer2(ii+ 6*nn:ii+ 7*nn-1) = pgroup%gaminv(ip:ip+nn-1)
                buffer2(ii+ 7*nn:ii+ 8*nn-1) = pgroup%ex(ip:ip+nn-1)
                buffer2(ii+ 8*nn:ii+ 9*nn-1) = pgroup%ey(ip:ip+nn-1)
                buffer2(ii+ 9*nn:ii+10*nn-1) = pgroup%ez(ip:ip+nn-1)
                buffer2(ii+10*nn:ii+11*nn-1) = pgroup%bx(ip:ip+nn-1)
                buffer2(ii+11*nn:ii+12*nn-1) = pgroup%by(ip:ip+nn-1)
                buffer2(ii+12*nn:ii+13*nn-1) = pgroup%bz(ip:ip+nn-1)
                do ipid=1,pgroup%npid
                  buffer2(ii+(12+ipid)*nn:ii+(13+ipid)*nn-1) = pgroup%pid(ip:ip+nn-1,ipid)
                enddo
                ii = ii + nn*nquant
              endif
            enddo
          endif
        endif

#ifdef VAMPIR
      call VTEND(20,IERR)
      call VTBEGIN(30,IERR)
#endif
        --- Now send the data.

        --- First send the number of particles to be sent
        call MPI_SENDRECV(nsendleft(is1:is2),int(is2-is1+1,MPIISZ),
     &                    MPI_INTEGER,left_pe,messid,
     &                    nrecvright(is1:is2),int(is2-is1+1,MPIISZ),
     &                    MPI_INTEGER,right_pe,messid,
     &                    comm_world_mpiisz,mpistatus,mpierror)
        call MPI_SENDRECV(nsendright(is1:is2),int(is2-is1+1,MPIISZ),
     &                    MPI_INTEGER,right_pe,messid,
     &                    nrecvleft(is1:is2),int(is2-is1+1,MPIISZ),
     &                    MPI_INTEGER,left_pe,messid,
     &                    comm_world_mpiisz,mpistatus,mpierror)

#ifdef VAMPIR
      call VTEND(30,IERR)
      call VTBEGIN(40,IERR)
#endif
#ifdef VAMPIR
      call VTEND(40,IERR)
      call VTBEGIN(50,IERR)
#endif

        nrecvleftsum = sum(nrecvleft(is1:is2))
        nrecvrightsum = sum(nrecvright(is1:is2))
        if (nquant*nrecvleftsum > b3size) then
          b3size = nquant*nrecvleftsum
          call gchange("Databuffers",0)
        endif
        if (nquant*nrecvrightsum > b4size) then
          b4size = nquant*nrecvrightsum
          call gchange("Databuffers",0)
        endif

        call MPI_SENDRECV(buffer2,nquant*nsendrightsum,MPI_DOUBLE_PRECISION,
     &                    right_pe,messid,
     &                    buffer3,nquant*nrecvleftsum,MPI_DOUBLE_PRECISION,
     &                    left_pe,messid,
     &                    comm_world_mpiisz,mpistatus,mpierror)
        call MPI_SENDRECV(buffer1,nquant*nsendleftsum,MPI_DOUBLE_PRECISION,
     &                    left_pe,messid,
     &                    buffer4,nquant*nrecvrightsum,MPI_DOUBLE_PRECISION,
     &                    right_pe,messid,
     &                    comm_world_mpiisz,mpistatus,mpierror)

#ifdef VAMPIR
      call VTEND(50,IERR)
      call VTBEGIN(60,IERR)
#endif

        il = 1
        ir = 1
        do is=is1,is2

          --- Make sure that there is space for the incoming data.
          --- chckpart is only called if the total amount of space
          --- available is not enough for the incoming particles.
          --- Note that chckpart must be called in this loop - if it is
          --- called for all species at once, it could double count the
          --- available space between species. If for example there where
          --- 10 empty elements between species and each needed 8 elements,
          --- chckpart wouldn't shift anything since it would see the 10
          --- would be enough for the 8 for each species, not knowing that
          --- 16 spaces is really needed.

          --- ilower is the lowest usable array location.
          ilower = pgroup%ipmax(is-1) + 1

          --- ihigher is the highest usable array location.
          ihigher = pgroup%ipmax(is)

          --- nn is the total space available for new particles, the
          --- total available range for species is minus the number of
          --- existing particles.
          nn = ihigher - ilower + 1 - pgroup%nps(is)
          if (nn < nrecvleft(is)+nrecvright(is)) then
            call chckpart(pgroup,int(is,ISZ),
     &                    int(nrecvleft(is),ISZ),int(nrecvright(is),ISZ))
          endif

          if (nrecvleft(is) > 0) then
            nn = nrecvleft(is)
            if (nn > 0) then

              --- This special value is needed since when npid == 0, its
              --- value would be out of bounds, i.e. > b3size. Note that in
              --- addpart, the pid data is only accessed if npid > 0.
              if (pgroup%npid > 0) then
                iii13 = il+13*nn
              else
                iii13 = il+(13+pgroup%npid)*nn-1  - 1
              endif

              call addpart(pgroup,nn,pgroup%npid,
     &                     buffer3(il      :il+   nn-1),
     &                     buffer3(il+   nn:il+ 2*nn-1),
     &                     buffer3(il+ 2*nn:il+ 3*nn-1),
     &                     buffer3(il+ 3*nn:il+ 4*nn-1),
     &                     buffer3(il+ 4*nn:il+ 5*nn-1),
     &                     buffer3(il+ 5*nn:il+ 6*nn-1),
     &                     buffer3(il+ 6*nn:il+ 7*nn-1),
     &                     buffer3(il+ 7*nn:il+ 8*nn-1),
     &                     buffer3(il+ 8*nn:il+ 9*nn-1),
     &                     buffer3(il+ 9*nn:il+10*nn-1),
     &                     buffer3(il+10*nn:il+11*nn-1),
     &                     buffer3(il+11*nn:il+12*nn-1),
     &                     buffer3(il+12*nn:il+13*nn-1),
     &                     buffer3(iii13   :il+(13+pgroup%npid)*nn-1),
     &                     is,
     &                     .true.,0.,0.,0.,0.,0.,0.,.false.,.false.,.false.,
     &                     .true.,.true.,.false.,.false.)

              ip = pgroup%ins(is) - nn
              pgroup%xp(ip:ip+nn-1)     = buffer3(il      :il+   nn-1)
              pgroup%yp(ip:ip+nn-1)     = buffer3(il+   nn:il+ 2*nn-1)
              pgroup%zp(ip:ip+nn-1)     = buffer3(il+ 2*nn:il+ 3*nn-1)
              pgroup%uxp(ip:ip+nn-1)    = buffer3(il+ 3*nn:il+ 4*nn-1)
              pgroup%uyp(ip:ip+nn-1)    = buffer3(il+ 4*nn:il+ 5*nn-1)
              pgroup%uzp(ip:ip+nn-1)    = buffer3(il+ 5*nn:il+ 6*nn-1)
              pgroup%gaminv(ip:ip+nn-1) = buffer3(il+ 6*nn:il+ 7*nn-1)
              pgroup%ex(ip:ip+nn-1)     = buffer3(il+ 7*nn:il+ 8*nn-1)
              pgroup%ey(ip:ip+nn-1)     = buffer3(il+ 8*nn:il+ 9*nn-1)
              pgroup%ez(ip:ip+nn-1)     = buffer3(il+ 9*nn:il+10*nn-1)
              pgroup%bx(ip:ip+nn-1)     = buffer3(il+10*nn:il+11*nn-1)
              pgroup%by(ip:ip+nn-1)     = buffer3(il+11*nn:il+12*nn-1)
              pgroup%bz(ip:ip+nn-1)     = buffer3(il+12*nn:il+13*nn-1)
              do ipid=1,pgroup%npid
                pgroup%pid(ip:ip+nn-1,ipid) = buffer3(il+(12+ipid)*nn:il+(13+ipid)*nn-1)
              enddo

              --- Grab pointers to incoming data coordinates.
              --- The buffer needs to be used, since when the new particle
              --- data is copied into pgroup, the data might be split up and
              --- could be noncontiguous.
              if (axis == 0) then
                zbuff => buffer3(il      :il+   nn-1) ! x
              else if (axis == 1) then
                zbuff => buffer3(il+   nn:il+ 2*nn-1) ! y
              else if (axis == 2) then
                zbuff => buffer3(il+ 2*nn:il+ 3*nn-1) ! z
              endif

              if (lrz) then
                --- ztemp = sqrt(x**2 + y**2)
                ztemp = maxval(sqrt(buffer3(il      :il+   nn-1)**2 +
     &                              buffer3(il+   nn:il+ 2*nn-1)**2))
              elseif (labs) then
                ztemp = maxval(abs(zbuff)) - zgrid
              else
                ztemp = maxval(zbuff) - zgrid
              endif
              if (ztemp > zpmaxlocal) then
                ldoagain = .true.
              endif

              il = il + nquant*nn

            endif
          endif

          if (nrecvright(is) > 0) then
            nn = nrecvright(is)
            if (nn > 0) then

              --- This special value is needed since when npid == 0, its
              --- value would be out of bounds, i.e. > b4size. Note that in
              --- addpart, the pid data is only accessed if npid > 0.
              if (pgroup%npid > 0) then
                iii13 = ir+13*nn
              else
                iii13 = ir+(13+pgroup%npid)*nn-1  - 1
              endif

              call addpart(pgroup,nn,pgroup%npid,
     &                     buffer4(ir      :ir+   nn-1),
     &                     buffer4(ir+   nn:ir+ 2*nn-1),
     &                     buffer4(ir+ 2*nn:ir+ 3*nn-1),
     &                     buffer4(ir+ 3*nn:ir+ 4*nn-1),
     &                     buffer4(ir+ 4*nn:ir+ 5*nn-1),
     &                     buffer4(ir+ 5*nn:ir+ 6*nn-1),
     &                     buffer4(ir+ 6*nn:ir+ 7*nn-1),
     &                     buffer4(ir+ 7*nn:ir+ 8*nn-1),
     &                     buffer4(ir+ 8*nn:ir+ 9*nn-1),
     &                     buffer4(ir+ 9*nn:ir+10*nn-1),
     &                     buffer4(ir+10*nn:ir+11*nn-1),
     &                     buffer4(ir+11*nn:ir+12*nn-1),
     &                     buffer4(ir+12*nn:ir+13*nn-1),
     &                     buffer4(iii13   :ir+(13+pgroup%npid)*nn-1),
     &                     is,
     &                     .true.,0.,0.,0.,0.,0.,0.,.false.,.false.,.false.,
     &                     .true.,.true.,.false.,.true.)

              ip = pgroup%ins(is) + pgroup%nps(is)
              pgroup%xp(ip:ip+nn-1)     = buffer4(ir      :ir+ 1*nn-1)
              pgroup%yp(ip:ip+nn-1)     = buffer4(ir+   nn:ir+ 2*nn-1)
              pgroup%zp(ip:ip+nn-1)     = buffer4(ir+ 2*nn:ir+ 3*nn-1)
              pgroup%uxp(ip:ip+nn-1)    = buffer4(ir+ 3*nn:ir+ 4*nn-1)
              pgroup%uyp(ip:ip+nn-1)    = buffer4(ir+ 4*nn:ir+ 5*nn-1)
              pgroup%uzp(ip:ip+nn-1)    = buffer4(ir+ 5*nn:ir+ 6*nn-1)
              pgroup%gaminv(ip:ip+nn-1) = buffer4(ir+ 6*nn:ir+ 7*nn-1)
              pgroup%ex(ip:ip+nn-1)     = buffer4(ir+ 7*nn:ir+ 8*nn-1)
              pgroup%ey(ip:ip+nn-1)     = buffer4(ir+ 8*nn:ir+ 9*nn-1)
              pgroup%ez(ip:ip+nn-1)     = buffer4(ir+ 9*nn:ir+10*nn-1)
              pgroup%bx(ip:ip+nn-1)     = buffer4(ir+10*nn:ir+11*nn-1)
              pgroup%by(ip:ip+nn-1)     = buffer4(ir+11*nn:ir+12*nn-1)
              pgroup%bz(ip:ip+nn-1)     = buffer4(ir+12*nn:ir+13*nn-1)
              do ipid=1,pgroup%npid
                pgroup%pid(ip:ip+nn-1,ipid) = buffer4(ir+(12+ipid)*nn:ir+(13+ipid)*nn-1)
              enddo

              --- Grab pointers to incoming data coordinates.
              --- The buffer needs to be used, since when the new particle
              --- data is copied into pgroup, the data might be split up and
              --- could be noncontiguous.
              if (axis == 0) then
                zbuff => buffer4(ir      :ir+   nn-1) ! x
              else if (axis == 1) then
                zbuff => buffer4(ir+   nn:ir+ 2*nn-1) ! y
              else if (axis == 2) then
                zbuff => buffer4(ir+ 2*nn:ir+ 3*nn-1) ! z
              endif

              if (lrz) then
                --- ztemp = sqrt(x**2 + y**2)
                ztemp = maxval(sqrt(buffer4(ir      :ir+ 1*nn-1)**2 +
     &                              buffer4(ir+   nn:ir+ 2*nn-1)**2))
              elseif (labs) then
                ztemp = minval(abs(zbuff)) - zgrid
              else
                ztemp = minval(zbuff) - zgrid
              endif
              if (ztemp < zpminlocal) then
                ldoagain = .true.
              endif

              ir = ir + nquant*nn

            endif
          endif
        enddo

#ifdef VAMPIR
      call VTEND(60,IERR)
      call VTBEGIN(70,IERR)
#endif
        call parallellor(ldoagain)
#ifdef VAMPIR
      call VTEND(70,IERR)
#endif

      enddo

  What follows (but is commented out) is the preferred version, but
  it causes a core dump for some unknown reason.
  Now it needs to be adjusted to send all species at once.
        allocate(blocklengths(nquant),displacements(nquant))
        --- Create new type for receiving particles from left
        do ii=1,nquant
          blocklengths(ii) = nrecvleft
        enddo
        ip = pgroup%ins(is) - nrecvleft
        call MPI_ADDRESS(pgroup%xp(ip),displacements(1),mpierror)
        call MPI_ADDRESS(pgroup%yp(ip),displacements(2),mpierror)
        call MPI_ADDRESS(pgroup%zp(ip),displacements(3),mpierror)
        call MPI_ADDRESS(pgroup%uxp(ip),displacements(4),mpierror)
        call MPI_ADDRESS(pgroup%uyp(ip),displacements(5),mpierror)
        call MPI_ADDRESS(pgroup%uzp(ip),displacements(6),mpierror)
        call MPI_ADDRESS(pgroup%gaminv(ip),displacements(7),mpierror)
        call MPI_ADDRESS(pgroup%ex(ip),displacements(8),mpierror)
        call MPI_ADDRESS(pgroup%ey(ip),displacements(9),mpierror)
        call MPI_ADDRESS(pgroup%ez(ip),displacements(10),mpierror)
        call MPI_ADDRESS(pgroup%bx(ip),displacements(11),mpierror)
        call MPI_ADDRESS(pgroup%by(ip),displacements(12),mpierror)
        call MPI_ADDRESS(pgroup%bz(ip),displacements(13),mpierror)
        do ipid=1,pgroup%npid
          call MPI_ADDRESS(pgroup%pid(ip,ipid),displacements(nquant+ipid),mpierror)
        enddo
        call MPI_TYPE_HINDEXED(nquant,blocklengths,displacements,
     &                         MPI_DOUBLE_PRECISION,mpinewtype,mpierror)
        call MPI_TYPE_COMMIT(mpinewtype,mpierror)

        --- Pass particle data to the right
        call MPI_SENDRECV(buffer2,nquant*nsendright,MPI_DOUBLE_PRECISION,right_pe,0,
     &                    MPI_BOTTOM,1,mpinewtype,left_pe,0,
     &                    comm_world_mpiisz,mpistatus,mpierror)
        pgroup%ins(is) = pgroup%ins(is) - nrecvleft
        pgroup%nps(is) = pgroup%nps(is) + nrecvleft
        call MPI_TYPE_FREE(mpinewtype,mpierror)

        --- Create new type for receiving particles from right
        do ii=1,nquant
          blocklengths(ii) = nrecvright
        enddo
        ip = pgroup%ins(is) + pgroup%nps(is)
        call MPI_ADDRESS(pgroup%xp(ip),displacements(1),mpierror)
        call MPI_ADDRESS(pgroup%yp(ip),displacements(2),mpierror)
        call MPI_ADDRESS(pgroup%zp(ip),displacements(3),mpierror)
        call MPI_ADDRESS(pgroup%uxp(ip),displacements(4),mpierror)
        call MPI_ADDRESS(pgroup%uyp(ip),displacements(5),mpierror)
        call MPI_ADDRESS(pgroup%uzp(ip),displacements(6),mpierror)
        call MPI_ADDRESS(pgroup%gaminv(ip),displacements(7),mpierror)
        call MPI_ADDRESS(pgroup%ex(ip),displacements(8),mpierror)
        call MPI_ADDRESS(pgroup%ey(ip),displacements(9),mpierror)
        call MPI_ADDRESS(pgroup%ez(ip),displacements(10),mpierror)
        call MPI_ADDRESS(pgroup%bx(ip),displacements(11),mpierror)
        call MPI_ADDRESS(pgroup%by(ip),displacements(12),mpierror)
        call MPI_ADDRESS(pgroup%bz(ip),displacements(13),mpierror)
        do ipid=1,pgroup%npid
          call MPI_ADDRESS(pgroup%pid(ip,ipid),displacements(nquant+ipid),mpierror)
        enddo
        call MPI_TYPE_HINDEXED(nquant,blocklengths,displacements,
     &                         MPI_DOUBLE_PRECISION,mpinewtype,mpierror)
        call MPI_TYPE_COMMIT(mpinewtype,mpierror)

        --- Pass particle data to the left
        call MPI_SENDRECV(buffer1,nquant*nsendleft,MPI_DOUBLE_PRECISION,left_pe,0,
     &                    MPI_BOTTOM,1,mpinewtype,right_pe,0,
     &                    comm_world_mpiisz,mpistatus,mpierror)
        pgroup%nps(is) = pgroup%nps(is) + nrecvright
        call MPI_TYPE_FREE(mpinewtype,mpierror)
        deallocate(blocklengths,displacements)

!$OMP MASTER
      if (ltoptimesubs) timezpartbnd_slave = timezpartbnd_slave + wtime() - substarttime
!$OMP END MASTER

      return
      end

[reorgparticles]
      subroutine reorgparticles_parallel(pgroup,l4symtry,l2symtry,lrz)
      use ParticleGroupmodule
      use Subtimerstop
      use InPart
      use Picglb
      use Parallel
      type(ParticleGroup):: pgroup
      logical(ISZ):: l4symtry,l2symtry,lrz

  Do all to all communication to pass particles to where they belong. This
  sorts through all of the particles first, then does all of the communication
  at once.

      integer(MPIISZ),allocatable:: xprocs(:),yprocs(:),zprocs(:)
      integer(MPIISZ):: nxp,nyp,nzp
      real(kind=8):: dxp,dyp,dzp
      integer(MPIISZ):: ix,iy,iz,ip
      real(kind=8):: xx,yy,zz
      integer(MPIISZ):: ipx1,ipx2,ipy1,ipy2,ipz1,ipz2,ipx,ipy,ipz
      real(kind=8),allocatable:: ptemp(:,:),precv(:,:)
      integer(MPIISZ),allocatable:: pprocs(:)
      integer(MPIISZ):: pins(0:pgroup%ns-1,0:nprocs-1)
      integer(MPIISZ):: pnps(0:pgroup%ns-1,0:nprocs-1)
      integer(MPIISZ):: piii(0:pgroup%ns-1,0:nprocs-1)
      integer(MPIISZ):: inrecv(0:pgroup%ns-1,0:nprocs-1)
      integer(MPIISZ):: nprecv(0:pgroup%ns-1,0:nprocs-1)
      integer(MPIISZ):: pnp,pnpmax,ncoords,ii,js,pip,ip1,ip2,iitemp,ii1,ii2
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      integer(MPIISZ):: sendcounts(0:nprocs-1),sdispls(0:nprocs-1)
      integer(MPIISZ):: recvcounts(0:nprocs-1),rdispls(0:nprocs-1)
      integer(MPIISZ):: allocerror
      logical(ISZ):: skipx,skipy,skipz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world

      --- First, create arrays that specify which processors owns which
      --- location. This is used to make the sorting of particles to
      --- processors efficient. Using this array avoids having to loop
      --- over every processor to find which one the particle is in.
      if (fsdecomp%nxglobal > 0) then
        dxp = minval(ppdecomp%xmax-ppdecomp%xmin)/10.
        nxp = min(100000,int((xpmax-xpmin)/dxp))
        dxp = (xpmax - xpmin)/nxp
        skipx = .false.
      else
        nxp = 1
        dxp = 1.
        skipx = .true.
      endif
      if (fsdecomp%nyglobal > 0) then
        dyp = minval(ppdecomp%ymax-ppdecomp%ymin)/10.
        nyp = min(100000,int((ypmax-ypmin)/dyp))
        dyp = (ypmax - ypmin)/nyp
        skipy = .false.
      else
        --- Set nyp to 1, so that ipy2 is OK.
        nyp = 1
        dyp = 1.
        skipy = .true.
      endif
      if (fsdecomp%nzglobal > 0) then
        dzp = minval(ppdecomp%zmax-ppdecomp%zmin)/10.
        nzp = min(100000,int((zpmax-zpmin)/dzp))
        dzp = (zpmax - zpmin)/nzp
        skipz = .false.
      else
        nzp = 1
        dzp = 1.
        skipz = .true.
      endif

      allocate(xprocs(0:nxp),yprocs(0:nyp),zprocs(0:nzp),stat=allocerror)
      if (allocerror /= 0) then
        print*,"reorgparticles_parallel: allocation error ",allocerror,
     &         ": could not allocate x,y,zprocs to shape ",nxp,nyp,nzp
        call kaboom("reorgparticles_parallel: allocation error")
        return
      endif

      ip = 0
      do ix=0,nxp
        xx = xpmin + ix*dxp
        do while (xx > ppdecomp%xmax(ip) .and. ip < nxprocs-1)
          ip = ip + 1
        enddo
        xprocs(ix) = ip
      enddo

      ip = 0
      do iy=0,nyp
        yy = ypmin + iy*dyp
        do while (yy > ppdecomp%ymax(ip) .and. ip < nyprocs-1)
          ip = ip + 1
        enddo
        yprocs(iy) = ip
      enddo

      ip = 0
      do iz=0,nzp
        zz = zpmin + iz*dzp
        do while (zz > ppdecomp%zmax(ip) .and. ip < nzprocs-1)
          ip = ip + 1
        enddo
        zprocs(iz) = ip
      enddo

      --- Next, create temporary arrays to hold the sorted particles.
      pnp = sum(pgroup%nps)
      ncoords = 13 + pgroup%npid
      pnpmax = max(int(1,MPIISZ),pnp)
      allocate(ptemp(0:ncoords-1,pnpmax),pprocs(pnpmax),stat=allocerror)
      if (allocerror /= 0) then
        print*,"reorgparticles_parallel: allocation error ",allocerror,
     &         ": could not allocate ptemp and pprocs to shape ",ncoords,pnpmax
        call kaboom("reorgparticles_parallel: allocation error")
        return
      endif

      --- Loop through the particles, finding to which processor each particle
      --- belongs.
      pip = 0
      pnps = 0
      pprocs = 1000000
      do js=0,pgroup%ns-1
        PLOOP: do ii=pgroup%ins(js+1),pgroup%ins(js+1)+pgroup%nps(js+1)-1
          xx = pgroup%xp(ii)
          yy = pgroup%yp(ii)
          zz = pgroup%zp(ii) - zbeam
          if (l4symtry) xx = abs(xx)
          if (l4symtry .or. l2symtry) yy = abs(yy)
          if (lrz) then
            xx = sqrt(xx**2 + yy**2)
            yy = ypmin
          else if (skipy) then
            yy = ypmin
          endif
          ix = int((xx - xpmin)/dxp)
          iy = int((yy - ypmin)/dyp)
          iz = int((zz - zpmin)/dzp)
          ipx1 = xprocs(ix)
          ipx2 = xprocs(ix+1)
          ipy1 = yprocs(iy)
          ipy2 = yprocs(iy+1)
          ipz1 = zprocs(iz)
          ipz2 = zprocs(iz+1)
          pip = pip + 1
          do ipz=ipz1,ipz2
            do ipy=ipy1,ipy2
              do ipx=ipx1,ipx2
                if ((ppdecomp%xmin(ipx) <= xx .and. xx < ppdecomp%xmax(ipx)
     &               .or. skipx) .and.
     &              (ppdecomp%ymin(ipy) <= yy .and. yy < ppdecomp%ymax(ipy)
     &               .or. skipy) .and.
     &              (ppdecomp%zmin(ipz) <= zz .and. zz < ppdecomp%zmax(ipz)
     &               .or. skipz)) then
                  ip = convertindextoproc(ipx,ipy,ipz,
     &                                    ppdecomp%nxprocs,
     &                                    ppdecomp%nyprocs,
     &                                    ppdecomp%nzprocs)
                  --- This is the same expression in convertindextoproc, but
                  --- is written out here for optimization.
                  ip = ipx + ipy*ppdecomp%nxprocs +
     &                       ipz*ppdecomp%nxprocs*ppdecomp%nyprocs
                  pprocs(pip) = ip
                  pnps(js,ip) = pnps(js,ip) + 1
                  cycle PLOOP
                endif
              enddo
            enddo
          enddo
          print*,"BADx ",js,ii,xx,ppdecomp%xmin(ipx1),ppdecomp%xmax(ipx2),ipx1,ipx2
          print*,"BADy ",js,ii,yy,ppdecomp%ymin(ipy1),ppdecomp%ymax(ipy2),ipy1,ipy2
          print*,"BADz ",js,ii,zz,ppdecomp%zmin(ipz1),ppdecomp%zmax(ipz2),ipz1,ipz2
        enddo PLOOP
      enddo

      --- Calculate starting index of particle groups for each species and
      --- processor
      pins(0,0) = 1
      do ip=0,nprocs-1
        do js=0,pgroup%ns-1
          if (js == 0) then
            if (ip == 0) then
              pins(js,ip) = 1
            else
              pins(js,ip) = pins(ns-1,ip-1) + pnps(ns-1,ip-1)
            endif
          else
            pins(js,ip) = pins(js-1,ip) + pnps(js-1,ip)
          endif
        enddo
      enddo

      --- Copy sorted particle data into the temporary array
      pip = 0
      piii = pins
      do js=0,pgroup%ns-1
        do ii=pgroup%ins(js+1),pgroup%ins(js+1)+pgroup%nps(js+1)-1
          pip = pip + 1
          ip = pprocs(pip)
          if (ip == 1000000) print*,"PIP ",js,ii,pgroup%xp(ii),pgroup%yp(ii),pgroup%zp(ii)
          iitemp = piii(js,ip)
          piii(js,ip) = piii(js,ip) + 1
          ptemp( 0,iitemp) = pgroup%xp(ii)
          ptemp( 1,iitemp) = pgroup%yp(ii)
          ptemp( 2,iitemp) = pgroup%zp(ii)
          ptemp( 3,iitemp) = pgroup%uxp(ii)
          ptemp( 4,iitemp) = pgroup%uyp(ii)
          ptemp( 5,iitemp) = pgroup%uzp(ii)
          ptemp( 6,iitemp) = pgroup%gaminv(ii)
          ptemp( 7,iitemp) = pgroup%ex(ii)
          ptemp( 8,iitemp) = pgroup%ey(ii)
          ptemp( 9,iitemp) = pgroup%ez(ii)
          ptemp(10,iitemp) = pgroup%bx(ii)
          ptemp(11,iitemp) = pgroup%by(ii)
          ptemp(12,iitemp) = pgroup%bz(ii)
          if (pgroup%npid > 0) then
            ptemp(13:,iitemp) = pgroup%pid(ii,:)
          endif
        enddo
      enddo

      --- Calculate pins for data set sent to each processor relative to 0
      do ip=0,nprocs-1
        pins(:,ip) = pins(:,ip) - pins(0,ip)
      enddo

      --- Exchange the number of particles which are to be sent to each
      --- processor
      call MPI_ALLTOALL(pins,int(pgroup%ns,MPIISZ),MPI_INTEGER,
     &                  inrecv,int(pgroup%ns,MPIISZ),MPI_INTEGER,
     &                  comm_world_mpiisz,mpierror)
      call MPI_ALLTOALL(pnps,int(pgroup%ns,MPIISZ),MPI_INTEGER,
     &                  nprecv,int(pgroup%ns,MPIISZ),MPI_INTEGER,
     &                  comm_world_mpiisz,mpierror)

      --- Resize particle arrays if neccesary. If there is more room available
      --- than incoming particles, then spread particles out by giving each
      --- species a fraction equal to its number of particles over the total.
      --- If there is not enough room, than add extra space and spread it out
      --- among the species the same way. Ensure that npmax == sum(np_s) so
      --- that if more space is not needed, an unneccesary realloction is not
      --- done.
      pnp = sum(nprecv)
      np_s = sum(nprecv,2)
      if (pnp > pgroup%npmax) pgroup%npmax = int(1.1*pnp)
      if (pnp > 0) then
        np_s = int(np_s*pgroup%npmax/pnp)
      else
        np_s = pgroup%npmax/ns
      endif
      np_s(ns) = pgroup%npmax - sum(np_s(1:ns-1))
      call alotpart(pgroup)
      
      --- Create space for incoming data
      pnpmax = max(int(1,MPIISZ),pnp)
      allocate(precv(0:ncoords-1,pnpmax),stat=allocerror)
      if (allocerror /= 0) then
        print*,"reorgparticles_parallel: allocation error ",allocerror,
     &         ": could not allocate precv to shape ",ncoords,pnpmax
        call kaboom("reorgparticles_parallel: allocation error")
        return
      endif

      --- Do the communication
      sendcounts = sum(pnps,1)*ncoords
      recvcounts = sum(nprecv,1)*ncoords
      sdispls(0) = 0
      rdispls(0) = 0
      do ip=1,nprocs-1
        sdispls(ip) = sdispls(ip-1) + sendcounts(ip-1)
        rdispls(ip) = rdispls(ip-1) + recvcounts(ip-1)
      enddo
      call MPI_ALLTOALLV(ptemp,sendcounts,sdispls,MPI_DOUBLE_PRECISION,
     &                   precv,recvcounts,rdispls,MPI_DOUBLE_PRECISION,
     &                   comm_world_mpiisz,mpierror)

      --- Calculate inrecv for data set sent to each processor relative
      --- to rdispls
      do ip=0,nprocs-1
        inrecv(:,ip) = inrecv(:,ip) + rdispls(ip)/ncoords + 1
      enddo

      --- Calculate starting index of particle groups for each species and
      --- processor
      inrecv(0,0) = 1
      do ip=0,nprocs-1
        do js=0,pgroup%ns-1
          if (js == 0) then
            if (ip == 0) then
              inrecv(js,ip) = 1
            else
              inrecv(js,ip) = inrecv(ns-1,ip-1) + nprecv(ns-1,ip-1)
            endif
          else
            inrecv(js,ip) = inrecv(js-1,ip) + nprecv(js-1,ip)
          endif
        enddo
      enddo

      --- Now copy the data into the particle arrays
      pgroup%nps = 0
      do js=0,pgroup%ns-1
        do ip=0,nprocs-1
          if (nprecv(js,ip) > 0) then
            ip1 = inrecv(js,ip)
            ip2 = ip1 + nprecv(js,ip) - 1
            ii1 = pgroup%ins(js+1) + pgroup%nps(js+1)
            ii2 = ii1 + nprecv(js,ip) - 1
            pgroup%nps(js+1) = pgroup%nps(js+1) + nprecv(js,ip)
            pgroup%xp(ii1:ii2)     = precv( 0,ip1:ip2)
            pgroup%yp(ii1:ii2)     = precv( 1,ip1:ip2)
            pgroup%zp(ii1:ii2)     = precv( 2,ip1:ip2)
            pgroup%uxp(ii1:ii2)    = precv( 3,ip1:ip2)
            pgroup%uyp(ii1:ii2)    = precv( 4,ip1:ip2)
            pgroup%uzp(ii1:ii2)    = precv( 5,ip1:ip2)
            pgroup%gaminv(ii1:ii2) = precv( 6,ip1:ip2)
            pgroup%ex(ii1:ii2)     = precv( 7,ip1:ip2)
            pgroup%ey(ii1:ii2)     = precv( 8,ip1:ip2)
            pgroup%ez(ii1:ii2)     = precv( 9,ip1:ip2)
            pgroup%bx(ii1:ii2)     = precv(10,ip1:ip2)
            pgroup%by(ii1:ii2)     = precv(11,ip1:ip2)
            pgroup%bz(ii1:ii2)     = precv(12,ip1:ip2)
            do ii=1,pgroup%npid
              pgroup%pid(ii1:ii2,ii) = precv(12+ii,ip1:ip2)
            enddo
          endif
        enddo
      enddo

      --- Free work space
      deallocate(xprocs,yprocs,zprocs,ptemp,precv,pprocs)

      --- Done!

!$OMP MASTER
      if (ltoptimesubs) timereorgparticles_parallel = timereorgparticles_parallel + wtime() - substarttime
!$OMP END MASTER

      return
      end

      integer(ISZ) function checkzpartbnd(pgroup)
      use ParticleGroupmodule
      use Subtimerstop
      use InPart
      use Picglb,Only: zpminlocal,zpmaxlocal
      use Parallel
      use Subcycling, Only: ndtstorho,zgridndts
      type(ParticleGroup):: pgroup
  Check if all particles are within the mesh.  Returns number of particles
  outside the mesh.
      integer(ISZ):: is,ip,nout,indts
      real(kind=8):: zgrid
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()

      nout = 0
      do is=1,pgroup%ns
        indts = ndtstorho(pgroup%ndts(is-1))
        zgrid = zgridndts(indts)
!$OMP PARALLEL DO REDUCTION(+:nout)
        do ip=pgroup%ins(is),pgroup%ins(is)+pgroup%nps(is)-1
          if (pgroup%zp(ip)-zgrid < zpminlocal .or.
     &        pgroup%zp(ip)-zgrid >= zpmaxlocal) then
            nout = nout + 1
          endif
        enddo
      enddo
!$OMP END PARALLEL DO
      checkzpartbnd = nout

!$OMP MASTER
      if (ltoptimesubs) timecheckzpartbnd = timecheckzpartbnd + wtime() - substarttime
!$OMP END MASTER

      return
      end

[getzmmnt]
      subroutine parallel_sum_mmnts(zmmnts0,zmmnts)
      use Subtimerstop
      use Parallel
      use Moments
      use Z_Moments
      real(kind=8):: zmmnts0(NUMZMMNT,0:nszmmnt)
      real(kind=8):: zmmnts(0:nzmmnt,NUMZMMNT,0:nszmmnt)

  Use reduction routines to sum whole beam moments across all
  of the processors.  It also shares z moment data at PE boundaries.

      --- temporary for z moments
      real(kind=8),allocatable:: ztemp0(:,:)
      real(kind=8),allocatable:: ztemp(:,:,:)
      real(kind=8),allocatable:: maxes1(:,:),mines1(:,:)
      real(kind=8),allocatable:: maxes2(:,:),mines2(:,:)
      integer(MPIISZ):: nn
      include "mpif.h"
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world

      allocate(ztemp0(NUMZMMNT,0:nszmmnt))
      allocate(ztemp(0:nzmmnt,NUMZMMNT,0:nszmmnt))
      allocate(maxes1(6,0:nszmmnt),mines1(6,0:nszmmnt))
      allocate(maxes2(6,0:nszmmnt),mines2(6,0:nszmmnt))

      --- Do reduction on whole beam moments.
      ztemp0 = zmmnts0
      nn = NUMZMMNT*(1+nszmmnt)
      call MPI_ALLREDUCE(ztemp0,zmmnts0,nn,
     &                   MPI_DOUBLE_PRECISION,MPI_SUM,comm_world_mpiisz,mpierror)

      --- Do reduction on beam z moments.
      ztemp = zmmnts
      nn = (1+nzmmnt)*NUMZMMNT*(1+nszmmnt)
      print*,"PSM1 ",my_index,nn
      call MPI_ALLREDUCE(ztemp,zmmnts,nn,
     &                   MPI_DOUBLE_PRECISION,MPI_SUM,comm_world_mpiisz,mpierror)
      call MPI_REDUCE(ztemp,zmmnts,nn,
     &                MPI_DOUBLE_PRECISION,MPI_SUM,int(0,MPIISZ),
     &                comm_world_mpiisz,mpierror)
      call MPI_BCAST(zmmnts,nn,MPI_DOUBLE_PRECISION,int(0,MPIISZ),
     &                comm_world_mpiisz,mpierror)

      print*,"PSM2 ",my_index
      --- Find global max's and min's
      --- This is done minimizing the amount of interprocessor communication
      maxes1(1,:) = xmaxp
      maxes1(2,:) = ymaxp
      maxes1(3,:) = zmaxp
      maxes1(4,:) = vxmaxp
      maxes1(5,:) = vymaxp
      maxes1(6,:) = vzmaxp
      mines1(1,:) = xminp
      mines1(2,:) = yminp
      mines1(3,:) = zminp
      mines1(4,:) = vxminp
      mines1(5,:) = vyminp
      mines1(6,:) = vzminp
      call MPI_ALLREDUCE(maxes1,maxes2,int(6*(1+nszmmnt),MPIISZ),
     &                   MPI_DOUBLE_PRECISION,
     &                   MPI_MAX,comm_world_mpiisz,mpierror)
      call MPI_ALLREDUCE(mines1,mines2,int(6*(1+nszmmnt),MPIISZ),
     &                   MPI_DOUBLE_PRECISION,
     &                   MPI_MIN,comm_world_mpiisz,mpierror)
      xmaxp = maxes2(1,:)
      ymaxp = maxes2(2,:)
      zmaxp = maxes2(3,:)
      vxmaxp = maxes2(4,:)
      vymaxp = maxes2(5,:)
      vzmaxp = maxes2(6,:)
      xminp = mines2(1,:)
      yminp = mines2(2,:)
      zminp = mines2(3,:)
      vxminp = mines2(4,:)
      vyminp = mines2(5,:)
      vzminp = mines2(6,:)

      deallocate(ztemp0)
      deallocate(ztemp)
      deallocate(maxes1,mines1)
      deallocate(maxes2,mines2)

!$OMP MASTER
      if (ltoptimesubs) timeparallel_sum_mmnts = timeparallel_sum_mmnts + wtime() - substarttime
!$OMP END MASTER

      return
      end

[finalize_temperature]
      subroutine parallel_sum_temperature()
      use Subtimerstop
      use Temperatures
      integer(ISZ):: nn
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()

  Do a global sum of the data used to calculate local temperatures.

      nn = (1+nxtslices)*(1+nytslices)*nztslices
      call parallelsumrealarray(pnumt,nn)
      call parallelsumrealarray(pnumtw,nn)
      call parallelsumrealarray(vxbart,nn)
      call parallelsumrealarray(vybart,nn)
      call parallelsumrealarray(vzbart,nn)
      call parallelsumrealarray(vxsqbart,nn)
      call parallelsumrealarray(vysqbart,nn)
      call parallelsumrealarray(vzsqbart,nn)
      call parallelsumrealarray(kebart,nn)
      call parallelsumrealarray(kesqbart,nn)

      if (l_temp_rmcorrelations .or. l_temp_rmcrosscorrelations) then

        nn = (1+nxtslicesc)*(1+nytslicesc)*nztslicesc
        call parallelsumrealarray(xbart,nn)
        call parallelsumrealarray(ybart,nn)
        call parallelsumrealarray(zbart,nn)
        call parallelsumrealarray(xsqbart,nn)
        call parallelsumrealarray(ysqbart,nn)
        call parallelsumrealarray(zsqbart,nn)

        if (l_temp_rmcorrelations) then
          call parallelsumrealarray(xvxbart,nn)
          call parallelsumrealarray(yvybart,nn)
          call parallelsumrealarray(zvzbart,nn)
          call parallelsumrealarray(xkebart,nn)
          call parallelsumrealarray(ykebart,nn)
          call parallelsumrealarray(zkebart,nn)
        endif
        
        if (l_temp_rmcrosscorrelations) then
          call parallelsumrealarray(xybart,nn)
          call parallelsumrealarray(xzbart,nn)
          call parallelsumrealarray(yzbart,nn)
          call parallelsumrealarray(xvybart,nn)
          call parallelsumrealarray(xvzbart,nn)
          call parallelsumrealarray(yvxbart,nn)
          call parallelsumrealarray(yvzbart,nn)
          call parallelsumrealarray(zvxbart,nn)
          call parallelsumrealarray(zvybart,nn)
        endif
        
      endif

!$OMP MASTER
      if (ltoptimesubs) timeparallel_sum_temperature = timeparallel_sum_temperature + wtime() - substarttime
!$OMP END MASTER

      return
      end
   Some parallel utility routines

[emitfrac] [emitthresh] [fixrhoxy] [getbeamcom] [inj_setrhomr] [parallel_sum_temperature] [percurr]
      subroutine parallelsumrealarray(data,ndata)
      use Subtimerstop
      use Parallel,Only: comm_world
      integer(ISZ):: ndata
      real(kind=8):: data(ndata)
  Does a sum over all of the processors.  Resulting data is broadcast
  to all processors.

      real(kind=8),allocatable:: data2(:)
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world
      allocate(data2(ndata))

      call MPI_ALLREDUCE(data,data2,int(ndata,MPIISZ),MPI_DOUBLE_PRECISION,
     &                   MPI_SUM,comm_world_mpiisz,mpierror)
      data = data2
      deallocate(data2)

!$OMP MASTER
      if (ltoptimesubs) timeparallelsumrealarray = timeparallelsumrealarray + wtime() - substarttime
!$OMP END MASTER

      return
      end

[multigrid3dsolve]
      subroutine parallelsumrealarraycomm(data,ndata,comm)
      use Subtimerstop
      integer(ISZ):: ndata
      real(kind=8):: data(ndata)
      integer(ISZ):: comm
  Does a sum over all of the processors.  Resulting data is broadcast
  to all processors.

      real(kind=8),allocatable:: data2(:)
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      allocate(data2(ndata))

      comm_world_mpiisz = comm
      call MPI_ALLREDUCE(data,data2,int(ndata,MPIISZ),MPI_DOUBLE_PRECISION,
     &                   MPI_SUM,comm_world_mpiisz,mpierror)
      data = data2
      deallocate(data2)

!$OMP MASTER
      if (ltoptimesubs) timeparallelsumrealarray = timeparallelsumrealarray + wtime() - substarttime
!$OMP END MASTER

      return
      end

[emitfrac] [emitthresh] [stptcl3d]
      subroutine parallelsumintegerarray(data,ndata)
      use Subtimerstop
      use Parallel,Only: comm_world
      integer(ISZ):: ndata
      integer(ISZ):: data(ndata)
  Does a sum over all of the processors.  Resulting data is broadcast
  to all processors.

      integer(MPIISZ),allocatable:: data2(:)
      integer(MPIISZ),allocatable:: data3(:)
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world
      allocate(data2(ndata))

      if (ISZ == 4) then
        call MPI_ALLREDUCE(data,data2,ndata,MPI_INTEGER,MPI_SUM,
     &                     comm_world_mpiisz,mpierror)
      else
        allocate(data3(ndata))
        data3 = data
        call MPI_ALLREDUCE(data3,data2,int(ndata,MPIISZ),MPI_INTEGER,MPI_SUM,
     &                     comm_world_mpiisz,mpierror)
        deallocate(data3)
      endif
      data = data2
      deallocate(data2)

!$OMP MASTER
      if (ltoptimesubs) timeparallelsumintegerarray = timeparallelsumintegerarray + wtime() - substarttime
!$OMP END MASTER

      return
      end

[bendfieldsol3d] [emitfrac] [emitthresh] [mgsolveimplicites3d] [paralleltridiag] [rhodia3d] [setlattzt]
      subroutine parallelmaxrealarray(data,ndata)
      use Subtimerstop
      use Parallel,Only: comm_world
      integer(ISZ):: ndata
      real(kind=8):: data(ndata)
  Find the max of data over all processors.  Resulting data is broadcast
  to all processors.

      real(kind=8),allocatable:: data2(:)
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world
      allocate(data2(ndata))

      call MPI_ALLREDUCE(data,data2,int(ndata,MPIISZ),MPI_DOUBLE_PRECISION,
     &                   MPI_MAX,comm_world_mpiisz,mpierror)
      data = data2
      deallocate(data2)

!$OMP MASTER
      if (ltoptimesubs) timeparallelmaxrealarray = timeparallelmaxrealarray + wtime() - substarttime
!$OMP END MASTER

      return
      end

[mgsolveimplicites2d] [multigrid2ddielectricsolve] [multigrid2dsolve] [multigrid3dsolve] [multigridberzsolve]
      subroutine parallelmaxrealarraycomm(data,ndata,comm)
      use Subtimerstop
      integer(ISZ):: ndata
      real(kind=8):: data(ndata)
      integer(ISZ):: comm
  Find the max of data over all processors.  Resulting data is broadcast
  to all processors.

      real(kind=8),allocatable:: data2(:)
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      allocate(data2(ndata))

      comm_world_mpiisz = comm
      call MPI_ALLREDUCE(data,data2,int(ndata,MPIISZ),MPI_DOUBLE_PRECISION,
     &                   MPI_MAX,comm_world_mpiisz,mpierror)
      data = data2
      deallocate(data2)

!$OMP MASTER
      if (ltoptimesubs) timeparallelmaxrealarray = timeparallelmaxrealarray + wtime() - substarttime
!$OMP END MASTER

      return
      end

      subroutine parallelmaxintegerarray(data,ndata)
      use Subtimerstop
      use Parallel,Only: comm_world
      integer(ISZ):: ndata
      integer(ISZ):: data(ndata)
  Find the max over all of the processors.  Resulting data is broadcast
  to all processors.

      integer(MPIISZ),allocatable:: data2(:)
      integer(MPIISZ),allocatable:: data3(:)
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world
      allocate(data2(ndata))

      if (ISZ == 4) then
        call MPI_ALLREDUCE(data,data2,ndata,MPI_INTEGER,MPI_MAX,
     &                     comm_world_mpiisz,mpierror)
      else
        allocate(data3(ndata))
        data3 = data
        call MPI_ALLREDUCE(data3,data2,int(ndata,MPIISZ),MPI_INTEGER,MPI_MAX,
     &                     comm_world_mpiisz,mpierror)
        deallocate(data3)
      endif
      data = data2
      deallocate(data2)

!$OMP MASTER
      if (ltoptimesubs) timeparallelmaxintegerarray = timeparallelmaxintegerarray + wtime() - substarttime
!$OMP END MASTER

      return
      end

[emitthresh]
      subroutine parallelminrealarray(data,ndata)
      use Subtimerstop
      use Parallel,Only: comm_world
      integer(ISZ):: ndata
      real(kind=8):: data(ndata)
  Find the min of data over all processors.  Resulting data is broadcast
  to all processors.

      real(kind=8),allocatable:: data2(:)
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world
      allocate(data2(ndata))

      call MPI_ALLREDUCE(data,data2,int(ndata,MPIISZ),MPI_DOUBLE_PRECISION,
     &                   MPI_MIN,comm_world_mpiisz,mpierror)
      data = data2
      deallocate(data2)

!$OMP MASTER
      if (ltoptimesubs) timeparallelminrealarray = timeparallelminrealarray + wtime() - substarttime
!$OMP END MASTER

      return
      end

      subroutine parallelminrealarraycomm(data,ndata,comm)
      use Subtimerstop
      integer(ISZ):: ndata
      real(kind=8):: data(ndata)
      integer(ISZ):: comm
  Find the min of data over all processors.  Resulting data is broadcast
  to all processors.

      real(kind=8),allocatable:: data2(:)
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      allocate(data2(ndata))

      comm_world_mpiisz = comm
      call MPI_ALLREDUCE(data,data2,int(ndata,MPIISZ),MPI_DOUBLE_PRECISION,
     &                   MPI_MIN,comm_world_mpiisz,mpierror)
      data = data2
      deallocate(data2)

!$OMP MASTER
      if (ltoptimesubs) timeparallelminrealarray = timeparallelminrealarray + wtime() - substarttime
!$OMP END MASTER

      return
      end

[particleboundaries_parallel]
      subroutine parallellor(data)
      use Subtimerstop
      use Parallel,Only: comm_world
      logical(ISZ):: data
  Logical OR of data over all processors.  Resulting data is broadcast
  to all processors.

      logical(MPIISZ):: data1,data2
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world

      data1 = data
      call MPI_ALLREDUCE(data1,data2,int(1,MPIISZ),MPI_LOGICAL,MPI_LOR,
     &                   comm_world_mpiisz,mpierror)
      data = data2

!$OMP MASTER
      if (ltoptimesubs) timeparallellor = timeparallellor + wtime() - substarttime
!$OMP END MASTER

      return
      end

      subroutine parallelbroadcastrealarray(data,ndata,root)
      use Subtimerstop
      use Parallel,Only: comm_world
      integer(ISZ):: ndata,root
      real(kind=8):: data(ndata)
  Broadcast the data to all processors.

      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world

      call MPI_BCAST(data,int(ndata,MPIISZ),MPI_DOUBLE_PRECISION,
     &               int(root,MPIISZ),comm_world_mpiisz,mpierror)

!$OMP MASTER
      if (ltoptimesubs) timeparallelbroadcastrealarray = timeparallelbroadcastrealarray + wtime() - substarttime
!$OMP END MASTER

      return
      end

[check_fbndz]
      subroutine parallelbarrier()
      use Subtimerstop
      use Parallel,Only: comm_world
  Calls MPI barrier
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world

      call MPI_BARRIER(comm_world_mpiisz,mpierror)

!$OMP MASTER
      if (ltoptimesubs) timeparallelbarrier = timeparallelbarrier + wtime() - substarttime
!$OMP END MASTER

      return
      end

[getinj_phi] [getinj_phi_3d] [getinj_phi_mr] [gettinj_phi] [gtlchg3dfromrho] [gtlchgrzfromrho] [rhodia3d] [sezax3d] [sphiax3d] [srhoax3d]
      subroutine parallelnonzerorealarray(data,ndata)
      use Subtimerstop
      use Parallel,Only: comm_world
      integer(ISZ):: ndata
      real(kind=8):: data(ndata)
  Find the nonzero value of data over all processors. This assumes that the
  non-zero values for each index are the same for all processors.
  Resulting data is broadcast to all processors.

      real(kind=8),allocatable:: dmax(:),dmin(:)
      integer(MPIISZ):: mpierror,comm_world_mpiisz
      include "mpif.h"
      real(kind=8):: substarttime,wtime
      if (ltoptimesubs) substarttime = wtime()
      comm_world_mpiisz = comm_world

      allocate(dmax(ndata),dmin(ndata))

      call MPI_ALLREDUCE(data,dmax,int(ndata,MPIISZ),MPI_DOUBLE_PRECISION,MPI_MAX,
     &                   comm_world_mpiisz,mpierror)
      call MPI_ALLREDUCE(data,dmin,int(ndata,MPIISZ),MPI_DOUBLE_PRECISION,MPI_MIN,
     &                   comm_world_mpiisz,mpierror)
      where (dmax /= 0)
        data = dmax
      elsewhere
        data = dmin
      endwhere

      deallocate(dmax,dmin)

!$OMP MASTER
      if (ltoptimesubs) timeparallelnonzerorealarray = timeparallelnonzerorealarray + wtime() - substarttime
!$OMP END MASTER

      return
      end