001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.local.stream.duct; 023 024import java.lang.ref.WeakReference; 025import java.lang.reflect.UndeclaredThrowableException; 026import java.util.ArrayList; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.Future; 033import java.util.concurrent.LinkedBlockingQueue; 034 035import cascading.flow.stream.duct.Duct; 036import cascading.flow.stream.duct.Fork; 037import cascading.tuple.TupleEntry; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * This "Fork" avoids a possible deadlock in Fork-and-Join scenarios by running downstream edges into parallel threads. 043 */ 044public class ParallelFork<Outgoing> extends Fork<TupleEntry, Outgoing> 045 { 046 private static final Logger LOG = LoggerFactory.getLogger( ParallelFork.class ); 047 048 abstract static class Message 049 { 050 final protected Duct previous; 051 052 public Message( Duct previous ) 053 { 054 this.previous = previous; 055 } 056 057 abstract public void passOn( Duct next ); 058 059 abstract public boolean isTermination(); 060 } 061 062 static final class StartMessage extends Message 063 { 064 final CountDownLatch startLatch; 065 066 public StartMessage( Duct previous, CountDownLatch startLatch ) 067 { 068 super( previous ); 069 this.startLatch = startLatch; 070 } 071 072 public void passOn( Duct next ) 073 { 074 startLatch.countDown(); 075 next.start( previous ); 076 } 077 078 public boolean isTermination() 079 { 080 return false; 081 } 082 } 083 084 static final class ReceiveMessage extends Message 085 { 086 final int ordinal; 087 final TupleEntry tuple; 088 089 public ReceiveMessage( Duct previous, int ordinal, TupleEntry tuple ) 090 { 091 super( previous ); 092 this.ordinal = ordinal; 093 094 // we make a new copy right here, to avoid cross-thread trouble when upstream changes the tuple 095 this.tuple = new TupleEntry( tuple ); 096 } 097 098 public void passOn( Duct next ) 099 { 100 next.receive( previous, ordinal, tuple ); 101 } 102 103 public boolean isTermination() 104 { 105 return false; 106 } 107 } 108 109 static final class CompleteMessage extends Message 110 { 111 public CompleteMessage( Duct previous ) 112 { 113 super( previous ); 114 } 115 116 public void passOn( Duct next ) 117 { 118 next.complete( previous ); 119 } 120 121 public boolean isTermination() 122 { 123 return true; 124 } 125 } 126 127 private final ArrayList<LinkedBlockingQueue<Message>> buffers; 128 private final ExecutorService executor; 129 private final ArrayList<Callable<Throwable>> actions; 130 private final ArrayList<Future<Throwable>> futures; 131 132 public ParallelFork( Duct[] allNext ) 133 { 134 super( allNext ); 135 136 // Obvious choices for nThread in newFixedThreadPool: 137 // nThreads = allNext.length. Potential to create a lot of thread-thrashing on machines with few cores, but 138 // the OS scheduler should ensure any executable thread gets a chance to proceed (and possibly 139 // complete, enabling others down a Local*Gate to proceed) 140 // nThreads = #of CPU. Would work, possibly by chance as long as #of CPU is "big enough" (see below) 141 // nThreads=1 : "parallel" is parallel with respect to upstream. This could work sometimes, but will still 142 // deadlock in the Fork-CoGroup-HashJoin scenario, as the other side of join could still be starved by 143 // one side not completing. 144 // nThreads = max(# of pipes merged into a CoGroup or HashJoin downstream from here). This is the minimum 145 // required to guarantee one side can't starve another. It COULD probably be queried from the flow graph, 146 // factoring in for all potential combinations... 147 // 148 // Therefore, the easy safe choice is to take allNext.length. 149 // 150 this.executor = Executors.newFixedThreadPool( allNext.length ); 151 152 ArrayList<LinkedBlockingQueue<Message>> buffers = new ArrayList<>( allNext.length ); 153 ArrayList<Future<Throwable>> futures = new ArrayList<>( allNext.length ); 154 ArrayList<Callable<Throwable>> actions = new ArrayList<>( allNext.length ); 155 156 for( final Duct anAllNext : allNext ) 157 { 158 final LinkedBlockingQueue<Message> queue = new LinkedBlockingQueue<>(); 159 160 buffers.add( queue ); 161 Callable<Throwable> action = new Callable<Throwable>() 162 { 163 @Override 164 public Throwable call() throws Exception 165 { 166 try 167 { 168 while( true ) 169 { 170 Message message = queue.take(); 171 message.passOn( anAllNext ); 172 173 if( message.isTermination() ) 174 return null; 175 } 176 } 177 catch( Throwable throwable ) 178 { 179 return throwable; 180 } 181 } 182 }; 183 184 actions.add( action ); 185 } 186 187 this.buffers = buffers; 188 this.actions = actions; 189 this.futures = futures; 190 } 191 192 @Override 193 public void initialize() 194 { 195 super.initialize(); 196 } 197 198 private void broadcastMessage( Message message ) 199 { 200 for( LinkedBlockingQueue<Message> queue : buffers ) 201 queue.offer( message ); 202 } 203 204 private WeakReference<Duct> started = null; 205 206 @Override 207 public void start( Duct previous ) 208 { 209 LOG.debug( "StartMessage {} BEGIN", previous ); 210 211 synchronized( this ) 212 { 213 if( started != null ) 214 { 215 LOG.error( "ParallelFork already started! former previous={}, new previous={}", started.get(), previous ); 216 return; 217 } 218 219 if( completed != null ) 220 throw new IllegalStateException( "cannot start an already completed ParallelFork" ); 221 222 started = new WeakReference<>( previous ); 223 } 224 225 try 226 { 227 for( Callable<Throwable> action : actions ) 228 futures.add( executor.submit( action ) ); 229 230 CountDownLatch startLatch = new CountDownLatch( allNext.length ); 231 broadcastMessage( new StartMessage( previous, startLatch ) ); 232 233 startLatch.await(); // wait for all threads to have started 234 } 235 catch( InterruptedException iex ) 236 { 237 throw new UndeclaredThrowableException( iex ); 238 } 239 } 240 241 @Override 242 public void receive( Duct previous, int ordinal, TupleEntry incoming ) 243 { 244 // incoming is copied once for each downstream pipe, within the current thread. 245 broadcastMessage( new ReceiveMessage( previous, ordinal, incoming ) ); 246 } 247 248 private WeakReference<Duct> completed = null; /* records origin duct */ 249 250 @Override 251 public void complete( Duct previous ) 252 { 253 synchronized( this ) 254 { 255 if( completed != null ) 256 { 257 LOG.error( "ParallelFork already complete! former previous={} new previous={}", completed.get(), previous ); 258 return; 259 } 260 261 completed = new WeakReference<>( previous ); 262 } 263 264 // the CompleteMessage will cause the downstream threads to complete 265 broadcastMessage( new CompleteMessage( previous ) ); 266 267 try 268 { 269 for( Future<Throwable> future : futures ) 270 { 271 Throwable throwable; 272 try 273 { 274 throwable = future.get(); 275 } 276 catch( InterruptedException iex ) 277 { 278 throwable = iex; 279 } 280 catch( ExecutionException cex ) 281 { 282 throwable = cex; 283 } 284 285 if( throwable != null ) 286 throw new RuntimeException( throwable ); 287 } 288 } 289 finally 290 { 291 executor.shutdown(); 292 } 293 } 294 }