2016-06-27 10 views

答えて

0

私はClouderaのCDH 5.7クラスタ内Scalaのを使用してハイブMySQLのからデータをインポートするSqoopを使用する必要がされたので、私はthis answerに従うことによって開始しました。

問題は、サーバーで実行されているときに正しい構成が得られていないことでした。

sqoop import --hive-import --connect "jdbc:mysql://host/db" \ 
--username "username" --password "password" --table "viewName" \ 
--hive-table "outputTable" -m 1 --check-column "dateColumnName" \ 
--last-value "lastMinDate" --incremental append 

だから、最後に、私はScalaのsys.process.ProcessBuilderを使用して外部プロセスとしてそれを実行することを選んだ:手動Sqoopを実行

はこのようなものでした。この方法では、SBTの依存関係は必要ありません。最後にランナーは次のように実装されました:

import sys.process._ 

def executeSqoop(connectionString: String, username: String, password: String, 
        viewName: String, outputTable: String, 
        dateColumnName: String, lastMinDate: String) = { 
    // To print every single line the process is writing into stdout and stderr respectively 
    val sqoopLogger = ProcessLogger(
    normalLine => log.debug(normalLine), 
    errorLine => errorLine match { 
     case line if line.contains("ERROR") => log.error(line) 
     case line if line.contains("WARN") => log.warning(line) 
     case line if line.contains("INFO") => log.info(line) 
     case line => log.debug(line) 
    } 
) 

    // Create Sqoop command, every parameter and value must be a separated String into the Seq 
    val command = Seq("sqoop", "import", "--hive-import", 
    "--connect", connectionString, 
    "--username", username, 
    "--password", password, 
    "--table", viewName, 
    "--hive-table", outputTable, 
    "-m", "1", 
    "--check-column", dateColumnName, 
    "--last-value", lastMinDate, 
    "--incremental", "append") 

    // result will contain the exit code of the command 
    val result = command ! sqoopLogger 
    if (result != 0) { 
    log.error("The Sqoop process did not finished successfully") 
    } else { 
    log.info("The Sqoop process finished successfully") 
    } 
} 
関連する問題