2016-08-17 16 views
1

現在、自動検出サービスでクラスタ化されたplay + akkaの実装を実装しようとしています。しかし、私は遊びに含まれているGuice DIローダーの問題に遭遇しているようです。そのドキュメントの状態からの抜粋:再生フレームワークを使用したAkkaクラスタの設定

https://www.playframework.com/documentation/2.5.x/ScalaAkka#Integrating-with-Akka

我々はライフサイクルフックなど、そのような正しいクラスローダなどのすべてを設定して、あなたは、俳優のシステムに組み込まれて使用をお勧めしますが、あなたを止めるものは何もありませんあなた自身の俳優システムを使用することから。

Playがシャットダウンしたときにアクターシステムをシャットダウンするためのストップフックを登録してください。 プレイ環境から正しいクラスローダーを渡す必要があります。そうしないと、Akkaはあなたのアプリケーションを見つけることができませんクラス

play.akka.configを使用してPlayの読み込み場所を変更するか、デフォルトのakka設定からakka設定を読み込まないようにしてくださいシステムは同じリモートポートにバインドしようとします

私はd私はそれがちょうど今、しかし、これを回避するために、私自身のGuiceApplicationBuilderを作成しようとした

class BuiltinModule extends Module { 
def bindings(env: Environment, configuration: Configuration): Seq[Binding[_]] = 

    { 
     def dynamicBindings(factories: ((Environment, Configuration) => Seq[Binding[_]])*) = { 
      factories.flatMap(_(env, configuration)) 
     } 

     Seq(
      bind[Environment] to env, 
      bind[ConfigurationProvider].to(new ConfigurationProvider(configuration)), 
      bind[Configuration].toProvider[ConfigurationProvider], 
      bind[HttpConfiguration].toProvider[HttpConfiguration.HttpConfigurationProvider], 

      // Application lifecycle, bound both to the interface, and its implementation, so that Application can access it 
      // to shut it down. 
      bind[DefaultApplicationLifecycle].toSelf, 
      bind[ApplicationLifecycle].to(bind[DefaultApplicationLifecycle]), 

      bind[Application].to[DefaultApplication], 
      bind[play.Application].to[play.DefaultApplication], 

      bind[Router].toProvider[RoutesProvider], 
      bind[play.routing.Router].to[JavaRouterAdapter], 
      bind[ActorSystem].toProvider[ActorSystemProvider], 
      bind[Materializer].toProvider[MaterializerProvider], 
      bind[ExecutionContextExecutor].toProvider[ExecutionContextProvider], 
      bind[ExecutionContext].to[ExecutionContextExecutor], 
      bind[Executor].to[ExecutionContextExecutor], 
      bind[HttpExecutionContext].toSelf, 

      bind[CryptoConfig].toProvider[CryptoConfigParser], 
      bind[CookieSigner].toProvider[CookieSignerProvider], 
      bind[CSRFTokenSigner].toProvider[CSRFTokenSignerProvider], 
      bind[AESCrypter].toProvider[AESCrypterProvider], 
      bind[play.api.libs.Crypto].toSelf, 
      bind[TemporaryFileCreator].to[DefaultTemporaryFileCreator] 
     ) ++ dynamicBindings(
      HttpErrorHandler.bindingsFromConfiguration, 
      HttpFilters.bindingsFromConfiguration, 
      HttpRequestHandler.bindingsFromConfiguration, 
      ActionCreator.bindingsFromConfiguration 
     ) 
     } 
    } 

:彼らはしかし、私はプレーを回避することができないようお勧めします1つの上記の構成はまだそれがBuiltInModuleから内部ActorSystemProviderのバインディング代わりにBuiltInModuleから重複バインディング例外を移動します。

ここで私がしようとしているものです:

AkkaConfigModule:

package module.akka 

import com.google.inject.{AbstractModule, Inject, Provider, Singleton} 
import com.typesafe.config.Config 
import module.akka.AkkaConfigModule.AkkaConfigProvider 
import net.codingwell.scalaguice.ScalaModule 
import play.api.Application 

/** 
    * Created by dmcquill on 8/15/16. 
    */ 
object AkkaConfigModule { 
    @Singleton 
    class AkkaConfigProvider @Inject() (application: Application) extends Provider[Config] { 
     override def get() = { 
      val classLoader = application.classloader 
      NodeConfigurator.loadConfig(classLoader) 
     } 
    } 
} 

/** 
    * Binds the application configuration to the [[Config]] interface. 
    * 
    * The config is bound as an eager singleton so that errors in the config are detected 
    * as early as possible. 
    */ 
class AkkaConfigModule extends AbstractModule with ScalaModule { 

    override def configure() { 
     bind[Config].toProvider[AkkaConfigProvider].asEagerSingleton() 
    } 

} 

ActorSystemModule:

package module.akka 


import actor.cluster.ClusterMonitor 
import akka.actor.ActorSystem 
import com.google.inject._ 
import com.typesafe.config.Config 
import net.codingwell.scalaguice.ScalaModule 
import play.api.inject.ApplicationLifecycle 

import scala.collection.JavaConversions._ 

/** 
    * Created by dmcquill on 7/27/16. 
    */ 
object ActorSystemModule { 
    @Singleton 
    class ActorSystemProvider @Inject() (val lifecycle: ApplicationLifecycle, val config: Config, val injector: Injector) extends Provider[ActorSystem] { 
     override def get() = { 
      val system = ActorSystem(config.getString(NodeConfigurator.CLUSTER_NAME_PROP), config.getConfig("fitnessApp")) 

      // add the GuiceAkkaExtension to the system, and initialize it with the Guice injector 
      GuiceAkkaExtension(system).initialize(injector) 

      system.log.info("Configured seed nodes: " + config.getStringList("fitnessApp.akka.cluster.seed-nodes").mkString(", ")) 
      system.actorOf(GuiceAkkaExtension(system).props(ClusterMonitor.name)) 

      lifecycle.addStopHook {() => 
       system.terminate() 
      } 

      system 
     } 
    } 
} 

/** 
    * A module providing an Akka ActorSystem. 
    */ 
class ActorSystemModule extends AbstractModule with ScalaModule { 
    import module.akka.ActorSystemModule.ActorSystemProvider 

    override def configure() { 
     bind[ActorSystem].toProvider[ActorSystemProvider].asEagerSingleton() 
    } 
} 

アプリケーションローダー:

class CustomApplicationLoader extends GuiceApplicationLoader { 

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = { 
     initialBuilder 
      .overrides(overrides(context): _*) 
      .bindings(new AkkaConfigModule, new ActorSystemModule) 
    } 

} 

私は旧姓主なものを達成するためには、AkkaクラスターのシードノードをプログラムでロードできるようにActorSystemを構成します。

上記のアプローチが適切なアプローチですか、これを達成する良い方法がありますか?これが正しいアプローチであれば、私は基本的に遊び/ guiceのDI設定について理解していませんか?

更新

このアーキテクチャでは、プレイ+アッカは、同じノード上に配置されています。

答えて

2

結局私は必要以上に複雑なことをやろうとしました。上記のフローを実行する代わりに、初期構成をプログラムで拡張して、必要なネットワーク情報をプログラムで取得できるようにしました。

NodeConfigurator:このクラスは、application.confからプロパティを取得し、その後、プログラムと組み合わせて使用​​するように設定を作成するために使用される、関連するユーティリティメソッドが含まれてい

最終結果は、本質的にいくつかのクラスで構成されkubernetes発見サービス。

object NodeConfigurator { 

    /** 
     * This method given a class loader will return the configuration object for an ActorSystem 
     * in a clustered environment 
     * 
     * @param classLoader the configured classloader of the application 
     * @return Config 
     */ 
    def loadConfig(classLoader: ClassLoader) = { 
     val config = ConfigFactory.load(classLoader) 

     val clusterName = config.getString(CLUSTER_NAME_PROP) 
     val seedPort = config.getString(SEED_PORT_CONF_PROP) 

     val host = if (config.getString(HOST_CONF_PROP) equals "eth0-address-or-localhost") { 
      getLocalHostAddress.getOrElse(DEFAULT_HOST_ADDRESS) 
     } else { 
      config.getString(HOST_CONF_PROP) 
     } 

     ConfigFactory.parseString(formatSeedNodesConfig(clusterName, getSeedNodes(config), seedPort, host)) 
      .withValue(HOST_CONF_PROP, ConfigValueFactory.fromAnyRef(host)) 
      .withValue("fitnessApp.akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(host)) 
      .withFallback(config) 
      .resolve() 
    } 

    /** 
     * Get the local ip address which defaults to localhost if not 
     * found on the eth0 adapter 
     * 
     * @return Option[String] 
     */ 
    def getLocalHostAddress: Option[String] = { 
     import java.net.NetworkInterface 

     import scala.collection.JavaConversions._ 

     NetworkInterface.getNetworkInterfaces 
      .find(_.getName equals "eth0") 
      .flatMap { interface => 
       interface.getInetAddresses.find(_.isSiteLocalAddress).map(_.getHostAddress) 
      } 
    } 

    /** 
     * Retrieves a set of seed nodes that are currently running in our cluster 
     * 
     * @param config akka configuration object 
     * @return Array[String] 
     */ 
    def getSeedNodes(config: Config) = { 
     if(config.hasPath(SEED_NODES_CONF_PROP)) { 
      config.getString(SEED_NODES_CONF_PROP).split(",").map(_.trim) 
     } else { 
      Array.empty[String] 
     } 
    } 

    /** 
     * formats the seed node addresses in the proper format 
     * 
     * @param clusterName name of akka cluster 
     * @param seedNodeAddresses listing of current seed nodes 
     * @param seedNodePort configured seed node port 
     * @param defaultSeedNodeAddress default seed node address 
     * @return 
     */ 
    def formatSeedNodesConfig(clusterName: String, seedNodeAddresses: Array[String], seedNodePort: String, defaultSeedNodeAddress: String) = { 
     if(seedNodeAddresses.isEmpty) { 
      s"""fitnessApp.akka.cluster.seed-nodes = [ "akka.tcp://[email protected]$defaultSeedNodeAddress:$seedNodePort" ]""" 
     } else { 
      seedNodeAddresses.map { address => 
       s"""fitnessApp.akka.cluster.seed-nodes += "akka.tcp://[email protected]$address:$seedNodePort"""" 
      }.mkString("\n") 
     } 
    } 

    val CLUSTER_NAME_PROP = "fitnessAkka.cluster-name" 
    val HOST_CONF_PROP = "fitnessAkka.host" 
    val PORT_CONF_PROP = "fitnessAkka.port" 
    val SEED_NODES_CONF_PROP = "fitnessAkka.seed-nodes" 
    val SEED_PORT_CONF_PROP = "fitnessAkka.seed-port" 

    private val DEFAULT_HOST_ADDRESS = "127.0.0.1" 
} 

CustomApplicationLoaderは:単純にNodeConfiguratorからから生成設定に取ると、それをinitialConfigurationを拡張するために遊びのオーバーライドアプリケーションローダを使用しています。

class CustomApplicationLoader extends GuiceApplicationLoader { 

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = { 
     val classLoader = context.environment.classLoader 
     val configuration = Configuration(NodeConfigurator.loadConfig(classLoader)) 

     initialBuilder 
       .in(context.environment) 
       .loadConfig(context.initialConfiguration ++ configuration) 
       .overrides(overrides(context): _*) 
    } 

} 

AkkaActorModule:クラスタのメンバーを表示するためのAPIで使用するための依存関係の注射俳優REFを提供します。

class AkkaActorModule extends AbstractModule with AkkaGuiceSupport { 
    def configure = { 
     bindActor[ClusterMonitor]("cluster-monitor") 
    } 
} 

ClusterMonitor:これは単にイベントをクラスタ化するために待機している俳優で、さらに現在のクラスタの状態を生成するために、メッセージを受け取ります。

class ClusterMonitor @Inject() extends Actor with ActorLogging { 
    import actor.cluster.ClusterMonitor.GetClusterState 

    val cluster = Cluster(context.system) 
    private var nodes = Set.empty[Address] 

    override def preStart(): Unit = { 
     cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) 
    } 

    override def postStop(): Unit = cluster.unsubscribe(self) 

    override def receive = { 
     case MemberUp(member) => { 
      nodes += member.address 
      log.info(s"Cluster member up: ${member.address}") 
     } 
     case UnreachableMember(member) => log.warning(s"Cluster member unreachable: ${member.address}") 
     case MemberRemoved(member, previousStatus) => { 
      nodes -= member.address 
      log.info(s"Cluster member removed: ${member.address}") 
     } 
     case MemberExited(member) => log.info(s"Cluster member exited: ${member.address}") 
     case GetClusterState => sender() ! nodes 
     case _: MemberEvent => 
    } 

} 

object ClusterMonitor { 
    case class GetClusterState() 
} 

アプリケーション:上記コントローラの出力を単にテストコントローラクラスタに参加しているノードのリスト

class Application @Inject() (@Named("cluster-monitor") clusterMonitorRef: ActorRef) extends Controller { 

    implicit val addressWrites = new Writes[Address] { 
     def writes(address: Address) = Json.obj(
      "host" -> address.host, 
      "port" -> address.port, 
      "protocol" -> address.protocol, 
      "system" -> address.system 
     ) 
    } 

    implicit val timeout = Timeout(5, TimeUnit.SECONDS) 

    def listClusterNodes = Action.async { 
     (clusterMonitorRef ? GetClusterState).mapTo[Set[Address]].map { addresses => 
      Ok(Json.toJson(addresses)) 
     } 
    } 

} 

結果は、以下のような出力を生成する:

$ http GET 192.168.99.100:30760/cluster/nodes 

HTTP/1.1 200 OK 
Content-Length: 235 
Content-Type: application/json 
Date: Thu, 18 Aug 2016 02:50:30 GMT 

[ 
    { 
     "host": "172.17.0.3", 
     "port": 2551, 
     "protocol": "akka.tcp", 
     "system": "fitnessApp" 
    }, 
    { 
     "host": "172.17.0.4", 
     "port": 2551, 
     "protocol": "akka.tcp", 
     "system": "fitnessApp" 
    }, 
    { 
     "host": "172.17.0.5", 
     "port": 2551, 
     "protocol": "akka.tcp", 
     "system": "fitnessApp" 
    } 
] 
0

lightbendの素晴らしい例があります。http://www.lightbend.com/activator/template/play-akka-cluster-sample サンプルサンプルをダウンロードして再利用できます。

+0

最初にこのテンプレートを作成したときにそのテンプレートを調べました。しかし、私が試しているアーキテクチャーは、同じノード上のplay + akkaであり、その結果、各プレイノードも内部にアクターシステムを持っています。 – dmcqu314

関連する問題