001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple; 022 023 import java.io.IOException; 024 025 /** 026 * Interface TupleEntryCollector is used to allow {@link cascading.operation.BaseOperation} instances to emit 027 * one or more result {@link Tuple} values. 028 * <p/> 029 * The general rule in Cascading is if you are handed a Tuple, you cannot change or cache it. Attempts at modifying 030 * such a Tuple will result in an Exception. Preventing caching is harder, see below. 031 * <p/> 032 * If you create the Tuple, you can re-use or modify it. 033 * <p/> 034 * When calling {@link #add(Tuple)} or {@link #add(TupleEntry)}, you are passing a Tuple to the down stream pipes and 035 * operations. Since no downstream operation may modify or cache the Tuple instance, it is safe to re-use the Tuple 036 * instance when {@code add()} returns. 037 * <p/> 038 * That said, Tuple copies do get cached in order to perform specific operations in the underlying platforms. Currently 039 * only a shallow copy is made (via the {@link Tuple} copy constructor). Thus, any mutable type or collection 040 * placed inside a Tuple will not be copied, but will likely be cached if a copy of the Tuple passed downstream is 041 * copied. 042 * <p/> 043 * So any subsequent changes to that nested type or collection will be reflected in the cached copy, a likely 044 * source of hard to find errors. 045 * <p/> 046 * There is currently no way to specify that a deep copy must be performed when making a Tuple copy. 047 */ 048 public abstract class TupleEntryCollector 049 { 050 protected TupleEntry tupleEntry = new TupleEntry( Fields.UNKNOWN, null, true ); 051 052 protected TupleEntryCollector() 053 { 054 } 055 056 /** 057 * Constructor TupleCollector creates a new TupleCollector instance. 058 * 059 * @param declared of type Fields 060 */ 061 public TupleEntryCollector( Fields declared ) 062 { 063 setFields( declared ); 064 } 065 066 public void setFields( Fields declared ) 067 { 068 if( declared == null ) 069 throw new IllegalArgumentException( "declared fields must not be null" ); 070 071 if( declared.isUnknown() || declared.isAll() ) 072 return; 073 074 this.tupleEntry = new TupleEntry( declared, Tuple.size( declared.size() ), true ); 075 } 076 077 /** 078 * Method add inserts the given {@link TupleEntry} into the outgoing stream. Note the method {@link #add(Tuple)} is 079 * more efficient as it simply calls {@link TupleEntry#getTuple()}; 080 * <p/> 081 * See {@link cascading.tuple.TupleEntryCollector} on when and how to re-use a Tuple instance. 082 * 083 * @param tupleEntry of type TupleEntry 084 */ 085 public void add( TupleEntry tupleEntry ) 086 { 087 Fields expectedFields = this.tupleEntry.getFields(); 088 TupleEntry outgoingEntry = this.tupleEntry; 089 090 if( expectedFields.isUnknown() || expectedFields.equals( tupleEntry.getFields() ) ) 091 outgoingEntry = tupleEntry; 092 else 093 outgoingEntry.setTuple( selectTupleFrom( tupleEntry, expectedFields ) ); 094 095 safeCollect( outgoingEntry ); 096 } 097 098 private Tuple selectTupleFrom( TupleEntry tupleEntry, Fields expectedFields ) 099 { 100 try 101 { 102 return tupleEntry.selectTuple( expectedFields ); 103 } 104 catch( TupleException exception ) 105 { 106 Fields givenFields = tupleEntry.getFields(); 107 String string = "given TupleEntry fields: " + givenFields.printVerbose(); 108 string += " do not match the operation declaredFields: " + expectedFields.printVerbose(); 109 string += ", operations must emit tuples that match the fields they declare as output"; 110 111 throw new TupleException( string, exception ); 112 } 113 } 114 115 /** 116 * Method add inserts the given {@link Tuple} into the outgoing stream. 117 * <p/> 118 * See {@link cascading.tuple.TupleEntryCollector} on when and how to re-use a Tuple instance. 119 * 120 * @param tuple of type Tuple 121 */ 122 public void add( Tuple tuple ) 123 { 124 if( !tupleEntry.getFields().isUnknown() && tupleEntry.getFields().size() != tuple.size() ) 125 throw new TupleException( "operation added the wrong number of fields, expected: " + tupleEntry.getFields().print() + ", got result size: " + tuple.size() ); 126 127 boolean isUnmodifiable = tuple.isUnmodifiable(); 128 129 tupleEntry.setTuple( tuple ); 130 131 try 132 { 133 safeCollect( tupleEntry ); 134 } 135 finally 136 { 137 Tuples.setUnmodifiable( tuple, isUnmodifiable ); 138 } 139 } 140 141 private void safeCollect( TupleEntry tupleEntry ) 142 { 143 try 144 { 145 collect( tupleEntry ); 146 } 147 catch( IOException exception ) 148 { 149 throw new TupleException( "unable to collect tuple", exception ); 150 } 151 } 152 153 protected abstract void collect( TupleEntry tupleEntry ) throws IOException; 154 155 /** 156 * Method close closes the underlying resource being written to. 157 * <p/> 158 * This method should be called when when an instance is returned via 159 * {@link cascading.tap.Tap#openForWrite(cascading.flow.FlowProcess)} 160 * and no more {@link Tuple} instances will be written out. 161 * <p/> 162 * This method must not be called when an instance is returned from {@code getOutputCollector()} from any of 163 * the relevant {@link cascading.operation.OperationCall} implementations (inside a Function, Aggregator, or Buffer). 164 */ 165 public void close() 166 { 167 // do nothing 168 } 169 }