001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.management;
022    
023    import java.util.Collection;
024    import java.util.List;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ExecutorService;
027    import java.util.concurrent.Executors;
028    import java.util.concurrent.Future;
029    import java.util.concurrent.TimeUnit;
030    
031    /**
032     * Class UnitOfWorkExecutorStrategy uses a simple {@link Executors#newFixedThreadPool(int)} {@link ExecutorService}
033     * to spawn threads.
034     * <p/>
035     * This is the default spawn strategy.
036     */
037    public class UnitOfWorkExecutorStrategy implements UnitOfWorkSpawnStrategy
038      {
039      private ExecutorService executor;
040    
041      public List<Future<Throwable>> start( UnitOfWork unitOfWork, int maxConcurrentThreads, Collection<Callable<Throwable>> values ) throws InterruptedException
042        {
043        executor = Executors.newFixedThreadPool( maxConcurrentThreads );
044    
045        List<Future<Throwable>> futures = executor.invokeAll( values ); // todo: consider submit()
046    
047        executor.shutdown(); // don't accept any more work
048    
049        return futures;
050        }
051    
052      @Override
053      public boolean isCompleted( UnitOfWork unitOfWork )
054        {
055        return executor == null || executor.isTerminated();
056        }
057    
058      @Override
059      public void complete( UnitOfWork unitOfWork, int duration, TimeUnit unit ) throws InterruptedException
060        {
061        if( executor == null )
062          return;
063    
064        executor.awaitTermination( duration, unit );
065        }
066      }