2017-08-06 3 views
0

この単純なカスケードプログラムを実行するのは苦労しています。何らかの理由でそれは何もしません。少なくとも私はそれがレコードを印刷することを期待しています。どんな助けもありがとう。ローカルモードで簡単なカスケーディングプログラムを実行する

package com.myLearning.cascading; 

import cascading.flow.Flow; 
import cascading.flow.FlowDef; 
import cascading.flow.local.LocalFlowConnector; 
import cascading.operation.Debug; 
import cascading.operation.expression.ExpressionFilter; 
import cascading.pipe.Each; 
import cascading.pipe.Pipe; 
import cascading.scheme.Scheme; 
import cascading.scheme.local.TextDelimited; 
import cascading.tap.SinkMode; 
import cascading.tap.Tap; 
import cascading.tap.local.FileTap; 
import cascading.tuple.Fields; 

public class operations_example 
{ 
    public static void main(String[] args) 
    { 
    Scheme sourceScheme = new TextDelimited(new Fields("username", "age"), true, ","); 
    String sourcePath = "C:/Users/Desktop/cascading/data/names.txt"; 
    Tap sourceTap = new FileTap(sourceScheme, sourcePath); 

    Scheme targetScheme = new TextDelimited(new Fields("username", "age"), true, ","); 
    String targetPath = "C:/Users/Desktop/cascading/data/output2.txt"; 
    Tap targetTap = new FileTap(targetScheme, targetPath, SinkMode.REPLACE); 

    Pipe dataPipe = new Pipe("data"); 
    dataPipe = new Each(dataPipe, new Debug()); 
    ExpressionFilter filter = new ExpressionFilter("age >= 30", Integer.TYPE); 

    dataPipe = new Each(dataPipe,new Fields("username","age"), filter); 

    FlowDef flowdef = FlowDef.flowDef(). 
      addSource(dataPipe, sourceTap). 
      addTailSink(dataPipe, targetTap); 

    Flow flow = new LocalFlowConnector().connect(flowdef); 
    flow.stop();  
    } 
} 

答えて

関連する問題