2016-02-25 13 views
11

Spark SQL DataFramesとJDBC接続を使用して、MySqlの一部のデータを挿入および更新しようとしています。SPARK SQL - DataFramesとJDBCを使用してMySqlテーブルを更新します。

SaveMode.Appendを使用して新しいデータを挿入することに成功しました。 MySqlテーブルに既に存在するデータをSpark SQLから更新する方法はありますか?

挿入する私のコードは次のとおりです。

myDataFrame.write.mode(SaveMode.Append).jdbc(JDBCurl,mySqlTable,connectionProperties)

私はそれが完全なテーブルを削除し、新しいものを作成しSaveMode.Overwriteするように変更した場合、私は「ON DUPLICATE KEY UPDATEのようなものを探しています"MySqlで利用可能

答えて

14

これはできません。今のよう(1.6.0/2.2.0 SNAPSHOTスパーク)スパークDataFrameWriterは4つしか書き込みモードをサポートしています。

  • SaveMode.Overwrite:既存のデータを上書きします。
  • SaveMode.Append:データを追加します。
  • SaveMode.Ignore:操作を無視します(つまり、操作なし)。
  • SaveMode.ErrorIfExists:デフォルトオプションは、実行時に例外をスローします。

あなたは(あなたがUPSERT操作が冪等する必要がありますしたいので、実装が、このような簡単なもの)mapPartitionsを使用して、たとえば、手動で挿入し、一時テーブルに書き込むと、手動でアップサート実行、またはトリガーを使用することができます。

一般に、バッチ操作のためのアップサット動作を達成し、まともなパフォーマンスを維持することは、些細なこととはまったく異なります。一般的には、複数の同時トランザクションが各パーティションに1つずつ存在するため、(通常はアプリケーション固有のパーティショニングを使用して)書き込み競合が発生しないようにするか、適切な回復手順を提供する必要があることを覚えておく必要があります。実際には、実行して一時表に一括書き込みし、upsert部分を直接データベース内で解決する方がよい場合があります。 MySQLのテーブル内のデータを更新するために、 https://pypi.python.org/pypi/JayDeBeApi/

0

zero323の答えは、私はちょうどあなたがこの問題を回避するためにJayDeBeApiパッケージを使用することができることを追加したい、権利です。すでにmysql jdbcドライバがインストールされているので、これはあまり意味がありません。

JayDeBeApiモジュールを使用すると、Java JDBCを使用してPythonコードから データベースに接続することができます。これは、 データベースにPython DB-API v2.0を提供します。

私たちはPythonのAnacondaディストリビューションを使用しており、JayDeBeApi pythonパッケージが標準で用意されています。

上記リンクの例を参照してください。

0

アップサーティングのような非常に一般的なケースのために、スパークにはSaveMode.Upsertモードがないことが残念です。

zero322が一般的ですが、そのような置き換え機能を提供するには(パフォーマンスを犠牲にして)可能でなければならないと思います。

また、このケースにいくつかのJavaコードを提供したいと考えました。 もちろん、スパークの組み込みのものほどパフォーマンスが良いわけではありませんが、それはあなたの必要条件の良い基礎になるはずです。ニーズに合わせて変更してください:

myDF.repartition(20); //one connection per partition, see below 

myDF.foreachPartition((Iterator<Row> t) -> { 
      Connection conn = DriverManager.getConnection(
        Constants.DB_JDBC_CONN, 
        Constants.DB_JDBC_USER, 
        Constants.DB_JDBC_PASS); 

      conn.setAutoCommit(true); 
      Statement statement = conn.createStatement(); 

      final int batchSize = 100000; 
      int i = 0; 
      while (t.hasNext()) { 
       Row row = t.next(); 
       try { 
        // better than REPLACE INTO, less cycles 
        statement.addBatch(("INSERT INTO mytable " + "VALUES (" 
          + "'" + row.getAs("_id") + "', 
          + "'" + row.getStruct(1).get(0) + "' 
          + "') ON DUPLICATE KEY UPDATE _id='" + row.getAs("_id") + "';")); 
        //conn.commit(); 

        if (++i % batchSize == 0) { 
         statement.executeBatch(); 
        } 
       } catch (SQLIntegrityConstraintViolationException e) { 
        //should not occur, nevertheless 
        //conn.commit(); 
       } catch (SQLException e) { 
        e.printStackTrace(); 
       } finally { 
        //conn.commit(); 
        statement.executeBatch(); 
       } 
      } 
      int[] ret = statement.executeBatch(); 

      System.out.println("Ret val: " + Arrays.toString(ret)); 
      System.out.println("Update count: " + statement.getUpdateCount()); 
      conn.commit(); 

      statement.close(); 
      conn.close(); 
関連する問題