001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.local.stream.duct;
023
024import java.lang.ref.WeakReference;
025import java.lang.reflect.UndeclaredThrowableException;
026import java.util.ArrayList;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.Future;
033import java.util.concurrent.LinkedBlockingQueue;
034
035import cascading.flow.stream.duct.Duct;
036import cascading.flow.stream.duct.Fork;
037import cascading.tuple.TupleEntry;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * This "Fork" avoids a possible deadlock in Fork-and-Join scenarios by running downstream edges into parallel threads.
043 */
044public class ParallelFork<Outgoing> extends Fork<TupleEntry, Outgoing>
045  {
046  private static final Logger LOG = LoggerFactory.getLogger( ParallelFork.class );
047
048  abstract static class Message
049    {
050    final protected Duct previous;
051
052    public Message( Duct previous )
053      {
054      this.previous = previous;
055      }
056
057    abstract public void passOn( Duct next );
058
059    abstract public boolean isTermination();
060    }
061
062  static final class StartMessage extends Message
063    {
064    final CountDownLatch startLatch;
065
066    public StartMessage( Duct previous, CountDownLatch startLatch )
067      {
068      super( previous );
069      this.startLatch = startLatch;
070      }
071
072    public void passOn( Duct next )
073      {
074      startLatch.countDown();
075      next.start( previous );
076      }
077
078    public boolean isTermination()
079      {
080      return false;
081      }
082    }
083
084  static final class ReceiveMessage extends Message
085    {
086    final int ordinal;
087    final TupleEntry tuple;
088
089    public ReceiveMessage( Duct previous, int ordinal, TupleEntry tuple )
090      {
091      super( previous );
092      this.ordinal = ordinal;
093
094      // we make a new copy right here, to avoid cross-thread trouble when upstream changes the tuple
095      this.tuple = new TupleEntry( tuple );
096      }
097
098    public void passOn( Duct next )
099      {
100      next.receive( previous, ordinal, tuple );
101      }
102
103    public boolean isTermination()
104      {
105      return false;
106      }
107    }
108
109  static final class CompleteMessage extends Message
110    {
111    public CompleteMessage( Duct previous )
112      {
113      super( previous );
114      }
115
116    public void passOn( Duct next )
117      {
118      next.complete( previous );
119      }
120
121    public boolean isTermination()
122      {
123      return true;
124      }
125    }
126
127  private final ArrayList<LinkedBlockingQueue<Message>> buffers;
128  private final ExecutorService executor;
129  private final ArrayList<Callable<Throwable>> actions;
130  private final ArrayList<Future<Throwable>> futures;
131
132  public ParallelFork( Duct[] allNext )
133    {
134    super( allNext );
135
136    // Obvious choices for nThread in newFixedThreadPool:
137    // nThreads = allNext.length. Potential to create a lot of thread-thrashing on machines with few cores, but
138    // the OS scheduler should ensure any executable thread gets a chance to proceed (and possibly
139    // complete, enabling others down a Local*Gate to proceed)
140    // nThreads = #of CPU. Would work, possibly by chance as long as #of CPU is "big enough" (see below)
141    // nThreads=1 : "parallel" is parallel with respect to upstream. This could work sometimes, but will still
142    // deadlock in the Fork-CoGroup-HashJoin scenario, as the other side of join could still be starved by
143    // one side not completing.
144    // nThreads = max(# of pipes merged into a CoGroup or HashJoin downstream from here). This is the minimum
145    // required to guarantee one side can't starve another. It COULD probably be queried from the flow graph,
146    // factoring in for all potential combinations...
147    //
148    // Therefore, the easy safe choice is to take allNext.length.
149    //
150    this.executor = Executors.newFixedThreadPool( allNext.length );
151
152    ArrayList<LinkedBlockingQueue<Message>> buffers = new ArrayList<>( allNext.length );
153    ArrayList<Future<Throwable>> futures = new ArrayList<>( allNext.length );
154    ArrayList<Callable<Throwable>> actions = new ArrayList<>( allNext.length );
155
156    for( final Duct anAllNext : allNext )
157      {
158      final LinkedBlockingQueue<Message> queue = new LinkedBlockingQueue<>();
159
160      buffers.add( queue );
161      Callable<Throwable> action = new Callable<Throwable>()
162        {
163        @Override
164        public Throwable call() throws Exception
165          {
166          try
167            {
168            while( true )
169              {
170              Message message = queue.take();
171              message.passOn( anAllNext );
172
173              if( message.isTermination() )
174                return null;
175              }
176            }
177          catch( Throwable throwable )
178            {
179            return throwable;
180            }
181          }
182        };
183
184      actions.add( action );
185      }
186
187    this.buffers = buffers;
188    this.actions = actions;
189    this.futures = futures;
190    }
191
192  @Override
193  public void initialize()
194    {
195    super.initialize();
196    }
197
198  private void broadcastMessage( Message message )
199    {
200    for( LinkedBlockingQueue<Message> queue : buffers )
201      queue.offer( message );
202    }
203
204  private WeakReference<Duct> started = null;
205
206  @Override
207  public void start( Duct previous )
208    {
209    LOG.debug( "StartMessage {} BEGIN", previous );
210
211    synchronized( this )
212      {
213      if( started != null )
214        {
215        LOG.error( "ParallelFork already started! former previous={}, new previous={}", started.get(), previous );
216        return;
217        }
218
219      if( completed != null )
220        throw new IllegalStateException( "cannot start an already completed ParallelFork" );
221
222      started = new WeakReference<>( previous );
223      }
224
225    try
226      {
227      for( Callable<Throwable> action : actions )
228        futures.add( executor.submit( action ) );
229
230      CountDownLatch startLatch = new CountDownLatch( allNext.length );
231      broadcastMessage( new StartMessage( previous, startLatch ) );
232
233      startLatch.await(); // wait for all threads to have started
234      }
235    catch( InterruptedException iex )
236      {
237      throw new UndeclaredThrowableException( iex );
238      }
239    }
240
241  @Override
242  public void receive( Duct previous, int ordinal, TupleEntry incoming )
243    {
244    // incoming is copied once for each downstream pipe, within the current thread.
245    broadcastMessage( new ReceiveMessage( previous, ordinal, incoming ) );
246    }
247
248  private WeakReference<Duct> completed = null; /* records origin duct */
249
250  @Override
251  public void complete( Duct previous )
252    {
253    synchronized( this )
254      {
255      if( completed != null )
256        {
257        LOG.error( "ParallelFork already complete! former previous={} new previous={}", completed.get(), previous );
258        return;
259        }
260
261      completed = new WeakReference<>( previous );
262      }
263
264    // the CompleteMessage will cause the downstream threads to complete
265    broadcastMessage( new CompleteMessage( previous ) );
266
267    try
268      {
269      for( Future<Throwable> future : futures )
270        {
271        Throwable throwable;
272        try
273          {
274          throwable = future.get();
275          }
276        catch( InterruptedException iex )
277          {
278          throwable = iex;
279          }
280        catch( ExecutionException cex )
281          {
282          throwable = cex;
283          }
284
285        if( throwable != null )
286          throw new RuntimeException( throwable );
287        }
288      }
289    finally
290      {
291      executor.shutdown();
292      }
293    }
294  }