Split Akka Stream Source into two

scala akka-stream

5090 просмотра

4 ответа

3852 Репутация автора

I have an Akka Streams Source which I want to split into two sources according to a predicate.

E.g. having a source (types are simplified intentionally):

val source: Source[Either[Throwable, String], NotUsed] = ???

And two methods:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ???
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???

I would like to be able to split the source according to _.isRight predicate and pass the right part to handleSuccess method and left part to handleFailure method.

I tried using Broadcast splitter but it requires Sinks at the end.

Автор: Tvaroh Источник Размещён: 18.07.2016 02:13

Ответы (4)


9 плюса

4433 Репутация автора

Although you can choose which side of the Source you want to retrieve items from it's not possible to create a Source that that yields two outputs which is what it seems like you would ultimately want.

Given the GraphStage below which essentially splits the left and right values into two outputs...

/**
  * Fans out left and right values of an either
  * @tparam L left value type
  * @tparam R right value type
  */
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
  import akka.stream.{Attributes, Outlet}
  import akka.stream.stage.GraphStageLogic

  override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    var out0demand = false
    var out1demand = false

    setHandler(shape.in, new InHandler {
      override def onPush(): Unit = {

        if (out0demand && out1demand) {
          grab(shape.in) match {
            case Left(l) =>
              out0demand = false
              push(shape.out0, l)
            case Right(r) =>
              out1demand = false
              push(shape.out1, r)
          }
        }
      }
    })

    setHandler(shape.out0, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out0demand) {
          out0demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })

    setHandler(shape.out1, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out1demand) {
          out1demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })
  }
}

.. you can route them to only receive one side:

val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s =>
  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> Sink.ignore

  SourceShape(eitherFanOut.out1)
})

Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf)

... or probably more desirable, route them to two seperate Sinks:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> l.in
  eitherFanOut.out1 ~> r.in

  ClosedShape
})


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

(Imports and initial setup)

import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, InHandler, OutHandler}
import akka.stream._
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val classLoader = getClass.getClassLoader
implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader)
implicit val materializer = ActorMaterializer()

val values: List[Either[Throwable, String]] = List(
  Right("B"),
  Left(new Throwable),
  Left(new RuntimeException),
  Right("B"),
  Right("C"),
  Right("G"),
  Right("I"),
  Right("F"),
  Right("T"),
  Right("A")
)

val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator)
Автор: JRomero Размещён: 18.07.2016 08:04

5 плюса

612 Репутация автора

For this you can use a broadcast, then filter and map the streams within the GraphDSL:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))


val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

       import GraphDSL.Implicits._

       val broadcast = b.add(Broadcast[Either[Throwable,String]](2))


       s ~> broadcast.in
       broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> l.in
       broadcast.out(1).filter(_.isRight).map(_.right.get) ~> r.in


       ClosedShape
  })


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

I expect you will be able to run the functions you want from within the map.

Автор: cessationoftime Размещён: 28.09.2016 10:07

8 плюса

2242 Репутация автора

This is implemented in akka-stream-contrib as PartitionWith. Add this dependency to SBT to pull it in to your project:

// latest version available on https://github.com/akka/akka-stream-contrib/releases libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"

PartitionWith is shaped like a Broadcast(2), but with potentially different types for each of the two outlets. You provide it with a predicate to apply to each element, and depending on the outcome, they get routed to the applicable outlet. You can then attach a Sink or Flow to each of these outlets independently as appropriate. Building on cessationoftime's example, with the Broadcast replaced with a PartitionWith:

val eitherSource: Source[Either[Throwable, String], NotUsed] = Source.empty
val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)
                                  ((_, _, _)) { implicit b => (s, l, r) =>

  import GraphDSL.Implicits._

  val pw = b.add(
    PartitionWith.apply[Either[Throwable, String], Throwable, String](identity)
  )

  eitherSource ~> pw.in
  pw.out0 ~> leftSink
  pw.out1 ~> rightSink

  ClosedShape
})

val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)
Автор: László van den Hoek Размещён: 26.09.2017 12:23

1 плюс

652 Репутация автора

In the meantime this has been introduced to standard Akka-Streams: https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Partition.html.

You can split the input stream with a predicate and then use collect on each outputs to get only the types you are interested in.

Автор: agabor Размещён: 27.09.2019 12:49
Вопросы из категории :
32x32