001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.cascade;
022
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Map;
026
027import cascading.flow.Flow;
028import cascading.property.UnitOfWorkDef;
029import cascading.tap.Tap;
030
031/**
032 * Class CascadeDef is a fluent interface for defining a {@link Cascade}.
033 * <p/>
034 * This allows for ad-hoc building of Cascade data and meta-data like tags.
035 * <p/>
036 * Instead of calling one of the {@link CascadeConnector} connect methods, {@link CascadeConnector#connect(CascadeDef)}
037 * can be called.
038 *
039 * @see cascading.property.UnitOfWorkDef
040 * @see cascading.flow.FlowDef
041 */
042public class CascadeDef extends UnitOfWorkDef<CascadeDef>
043  {
044  Map<String, Flow> flows = new HashMap<String, Flow>();
045  int maxConcurrentFlows = -1;
046
047  /**
048   * Creates a new instance of a CascadeDef.
049   *
050   * @return a CascadeDef
051   */
052  public static CascadeDef cascadeDef()
053    {
054    return new CascadeDef();
055    }
056
057  /** Constructor CascadeDef creates a new CascadeDef instance. */
058  public CascadeDef()
059    {
060    }
061
062  /**
063   * Method getFlows returns the flows of this CascadeDef object.
064   *
065   * @return the flows (type Collection<Flow>) of this CascadeDef object.
066   */
067  public Collection<Flow> getFlows()
068    {
069    return flows.values();
070    }
071
072  /**
073   * Method getFlowsArray returns the flows as an array of this CascadeDef object.
074   *
075   * @return the flowsArray (type Flow[]) of this CascadeDef object.
076   */
077  public Flow[] getFlowsArray()
078    {
079    return getFlows().toArray( new Flow[ flows.size() ] );
080    }
081
082  /**
083   * Method addFlow adds a new {@link cascading.flow.Flow} instance that is intended to participate in a {@link Cascade}.
084   *
085   * @param flow of Flow
086   * @return CascadeDef
087   */
088  public CascadeDef addFlow( Flow flow )
089    {
090    if( flow == null )
091      return this;
092
093    if( flows.containsKey( flow.getName() ) )
094      throw new CascadeException( "all flow names must be unique, found duplicate: " + flow.getName() );
095
096    Collection<Tap> sinks = flow.getSinksCollection();
097
098    for( Tap sink : sinks )
099      {
100      String fullIdentifier = sink.getFullIdentifier( flow.getConfig() );
101
102      for( Flow existingFlow : flows.values() )
103        {
104        Collection<Tap> existingSinks = existingFlow.getSinksCollection();
105
106        for( Tap existingSink : existingSinks )
107          {
108          if( fullIdentifier.equals( existingSink.getFullIdentifier( existingFlow.getConfig() ) ) )
109            throw new CascadeException( "the flow: " + flow.getName() + ", has a sink identifier: " + fullIdentifier + ", in common with the flow: " + existingFlow.getName() );
110          }
111        }
112      }
113
114    flows.put( flow.getName(), flow );
115
116    return this;
117    }
118
119  /**
120   * Method addFlows adds many new {@link cascading.flow.Flow} instances intended to participate in a {@link Cascade}.
121   *
122   * @param flows of Flow[]
123   * @return CascadeDef
124   */
125  public CascadeDef addFlows( Flow... flows )
126    {
127    for( Flow flow : flows )
128      addFlow( flow );
129
130    return this;
131    }
132
133  /**
134   * Method addFlows adds many new {@link cascading.flow.Flow} instances intended to participate in a {@link Cascade}.
135   *
136   * @param flows of Collection<Flow>
137   * @return CascadeDef
138   */
139  public CascadeDef addFlows( Collection<Flow> flows )
140    {
141    for( Flow flow : flows )
142      addFlow( flow );
143
144    return this;
145    }
146
147  public CascadeDef setMaxConcurrentFlows( int maxConcurrentFlows )
148    {
149    this.maxConcurrentFlows = maxConcurrentFlows;
150
151    return this;
152    }
153
154  public int getMaxConcurrentFlows()
155    {
156    return maxConcurrentFlows;
157    }
158  }