001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.assertion;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.regex.Matcher;
025    import java.util.regex.Pattern;
026    
027    import cascading.flow.FlowProcess;
028    import cascading.operation.GroupAssertion;
029    import cascading.operation.GroupAssertionCall;
030    import cascading.operation.OperationCall;
031    import cascading.tuple.Tuple;
032    import cascading.tuple.TupleEntry;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     *
038     */
039    public abstract class AssertGroupBase extends BaseAssertion<AssertGroupBase.Context> implements GroupAssertion<AssertGroupBase.Context>
040      {
041      /** Field LOG */
042      private static final Logger LOG = LoggerFactory.getLogger( AssertGroupBase.class );
043    
044      /** Field patternString */
045      protected String patternString;
046    
047      /** Field size */
048      protected final long size;
049    
050      public static class Context
051        {
052        Pattern pattern;
053        Long count;
054        String fields;
055        String group;
056    
057        public Context set( long count, String fields, String group )
058          {
059          this.count = count;
060          this.fields = fields;
061          this.group = group;
062    
063          return this;
064          }
065    
066        public Context reset()
067          {
068          count = null;
069          fields = null;
070          group = null;
071    
072          return this;
073          }
074        }
075    
076      @ConstructorProperties({"message", "size"})
077      public AssertGroupBase( String message, long size )
078        {
079        super( message );
080        this.size = size;
081        }
082    
083      @ConstructorProperties({"message", "patternString", "size"})
084      protected AssertGroupBase( String message, String patternString, long size )
085        {
086        super( message );
087        this.patternString = patternString;
088        this.size = size;
089        }
090    
091      public String getPatternString()
092        {
093        return patternString;
094        }
095    
096      public long getSize()
097        {
098        return size;
099        }
100    
101      private Pattern getPattern()
102        {
103        Pattern pattern;
104    
105        if( patternString == null )
106          pattern = Pattern.compile( ".*" );
107        else
108          pattern = Pattern.compile( patternString );
109    
110        return pattern;
111        }
112    
113      private boolean matchWholeTuple( Tuple input, Pattern pattern )
114        {
115        if( patternString == null )
116          return true;
117    
118        Matcher matcher = pattern.matcher( input.toString( "\t", false ) );
119    
120        LOG.debug( "pattern: {}, matches: {}", pattern, matcher.matches() );
121    
122        return matcher.matches();
123        }
124    
125      @Override
126      public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
127        {
128        operationCall.setContext( new Context() );
129        operationCall.getContext().pattern = getPattern();
130        }
131    
132      @Override
133      public void start( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall )
134        {
135        TupleEntry groupEntry = assertionCall.getGroup();
136        Context context = assertionCall.getContext();
137    
138        // didn't match, so skip
139        if( !matchWholeTuple( groupEntry.getTuple(), context.pattern ) )
140          context.reset();
141        else
142          context.set( 0L, groupEntry.getFields().print(), groupEntry.getTuple().print() );
143        }
144    
145      @Override
146      public void aggregate( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall )
147        {
148        Long groupSize = assertionCall.getContext().count;
149    
150        // didn't match, so skip
151        if( groupSize != null )
152          assertionCall.getContext().count += 1L;
153        }
154    
155      @Override
156      public void doAssert( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall )
157        {
158        Context context = assertionCall.getContext();
159        Long groupSize = context.count;
160    
161        if( groupSize == null ) // didn't match, so skip
162          return;
163    
164        if( assertFails( groupSize ) )
165          {
166          if( patternString == null )
167            fail( groupSize, size, context.fields, context.group );
168          else
169            fail( patternString, groupSize, size, context.fields, context.group );
170          }
171        }
172    
173      protected abstract boolean assertFails( Long groupSize );
174    
175      @Override
176      public boolean equals( Object object )
177        {
178        if( this == object )
179          return true;
180        if( !( object instanceof AssertGroupBase ) )
181          return false;
182        if( !super.equals( object ) )
183          return false;
184    
185        AssertGroupBase that = (AssertGroupBase) object;
186    
187        if( size != that.size )
188          return false;
189        if( patternString != null ? !patternString.equals( that.patternString ) : that.patternString != null )
190          return false;
191    
192        return true;
193        }
194    
195      @Override
196      public int hashCode()
197        {
198        int result = super.hashCode();
199        result = 31 * result + ( patternString != null ? patternString.hashCode() : 0 );
200        result = 31 * result + (int) ( size ^ size >>> 32 );
201        return result;
202        }
203      }