001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.assertion;
022
023import java.beans.ConstructorProperties;
024import java.util.regex.Matcher;
025import java.util.regex.Pattern;
026
027import cascading.flow.FlowProcess;
028import cascading.management.annotation.Property;
029import cascading.management.annotation.PropertyDescription;
030import cascading.management.annotation.Visibility;
031import cascading.operation.GroupAssertion;
032import cascading.operation.GroupAssertionCall;
033import cascading.operation.OperationCall;
034import cascading.tuple.Tuple;
035import cascading.tuple.TupleEntry;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 *
041 */
042public abstract class AssertGroupBase extends BaseAssertion<AssertGroupBase.Context> implements GroupAssertion<AssertGroupBase.Context>
043  {
044  /** Field LOG */
045  private static final Logger LOG = LoggerFactory.getLogger( AssertGroupBase.class );
046
047  /** Field patternString */
048  protected String patternString;
049
050  /** Field size */
051  protected final long size;
052
053  public static class Context
054    {
055    Pattern pattern;
056    Long count;
057    String fields;
058    String group;
059
060    public Context set( long count, String fields, String group )
061      {
062      this.count = count;
063      this.fields = fields;
064      this.group = group;
065
066      return this;
067      }
068
069    public Context reset()
070      {
071      count = null;
072      fields = null;
073      group = null;
074
075      return this;
076      }
077    }
078
079  @ConstructorProperties({"message", "size"})
080  public AssertGroupBase( String message, long size )
081    {
082    super( message );
083    this.size = size;
084    }
085
086  @ConstructorProperties({"message", "patternString", "size"})
087  protected AssertGroupBase( String message, String patternString, long size )
088    {
089    super( message );
090    this.patternString = patternString;
091    this.size = size;
092    }
093
094  @Property(name = "patternString", visibility = Visibility.PRIVATE)
095  @PropertyDescription("The regular expression pattern string.")
096  public String getPatternString()
097    {
098    return patternString;
099    }
100
101  public long getSize()
102    {
103    return size;
104    }
105
106  private Pattern getPattern()
107    {
108    Pattern pattern;
109
110    if( patternString == null )
111      pattern = Pattern.compile( ".*" );
112    else
113      pattern = Pattern.compile( patternString );
114
115    return pattern;
116    }
117
118  private boolean matchWholeTuple( Tuple input, Pattern pattern )
119    {
120    if( patternString == null )
121      return true;
122
123    Matcher matcher = pattern.matcher( input.toString( "\t", false ) );
124
125    LOG.debug( "pattern: {}, matches: {}", pattern, matcher.matches() );
126
127    return matcher.matches();
128    }
129
130  @Override
131  public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
132    {
133    operationCall.setContext( new Context() );
134    operationCall.getContext().pattern = getPattern();
135    }
136
137  @Override
138  public void start( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall )
139    {
140    TupleEntry groupEntry = assertionCall.getGroup();
141    Context context = assertionCall.getContext();
142
143    // didn't match, so skip
144    if( !matchWholeTuple( groupEntry.getTuple(), context.pattern ) )
145      context.reset();
146    else
147      context.set( 0L, groupEntry.getFields().print(), groupEntry.getTuple().print() );
148    }
149
150  @Override
151  public void aggregate( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall )
152    {
153    Long groupSize = assertionCall.getContext().count;
154
155    // didn't match, so skip
156    if( groupSize != null )
157      assertionCall.getContext().count += 1L;
158    }
159
160  @Override
161  public void doAssert( FlowProcess flowProcess, GroupAssertionCall<Context> assertionCall )
162    {
163    Context context = assertionCall.getContext();
164    Long groupSize = context.count;
165
166    if( groupSize == null ) // didn't match, so skip
167      return;
168
169    if( assertFails( groupSize ) )
170      {
171      if( patternString == null )
172        fail( groupSize, size, context.fields, context.group );
173      else
174        fail( patternString, groupSize, size, context.fields, context.group );
175      }
176    }
177
178  protected abstract boolean assertFails( Long groupSize );
179
180  @Override
181  public boolean equals( Object object )
182    {
183    if( this == object )
184      return true;
185    if( !( object instanceof AssertGroupBase ) )
186      return false;
187    if( !super.equals( object ) )
188      return false;
189
190    AssertGroupBase that = (AssertGroupBase) object;
191
192    if( size != that.size )
193      return false;
194    if( patternString != null ? !patternString.equals( that.patternString ) : that.patternString != null )
195      return false;
196
197    return true;
198    }
199
200  @Override
201  public int hashCode()
202    {
203    int result = super.hashCode();
204    result = 31 * result + ( patternString != null ? patternString.hashCode() : 0 );
205    result = 31 * result + (int) ( size ^ size >>> 32 );
206    return result;
207    }
208  }