Connecting ZipWith Outlet to another ZipWith as Inlet












0















I couldn't define a ZipWith out as In of another ZipWith. It only gets single event from one of the kafka topics then the rest of flow doesn't work and it doesn't get any other event. Single zipWith is working for 2 kafka datasources. Whenever I introduce the pmf and connect pm1 and pm2's outlets to inlet of pmf, it doesn't work :( Can you please help ?



Data is retrieved from Kafka topics, it works with datasources like Source(1 to 100). Kafka datasource has different behaviour all my tests are done with dummy datasources.



package sample

import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.Future

object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()

private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)


private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)

private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)


val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}

def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}

def printss(s:String): Unit ={
print(s)
}


val sinkFinal = Sink.foreach(printss)

val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)

s3 ~> pm2.in(0)
s4 ~> pm2.in(1)

pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)

pmf.out ~> sink.in

ClosedShape
})

val max: Future[Done] = g.run()

def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}









share|improve this question

























  • I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.

    – gabrielgiussi
    Dec 29 '18 at 20:42











  • @gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks

    – rernas35
    Jan 7 at 8:08











  • What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.

    – gabrielgiussi
    Jan 8 at 17:57
















0















I couldn't define a ZipWith out as In of another ZipWith. It only gets single event from one of the kafka topics then the rest of flow doesn't work and it doesn't get any other event. Single zipWith is working for 2 kafka datasources. Whenever I introduce the pmf and connect pm1 and pm2's outlets to inlet of pmf, it doesn't work :( Can you please help ?



Data is retrieved from Kafka topics, it works with datasources like Source(1 to 100). Kafka datasource has different behaviour all my tests are done with dummy datasources.



package sample

import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.Future

object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()

private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)


private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)

private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)


val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}

def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}

def printss(s:String): Unit ={
print(s)
}


val sinkFinal = Sink.foreach(printss)

val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)

s3 ~> pm2.in(0)
s4 ~> pm2.in(1)

pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)

pmf.out ~> sink.in

ClosedShape
})

val max: Future[Done] = g.run()

def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}









share|improve this question

























  • I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.

    – gabrielgiussi
    Dec 29 '18 at 20:42











  • @gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks

    – rernas35
    Jan 7 at 8:08











  • What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.

    – gabrielgiussi
    Jan 8 at 17:57














0












0








0


0






I couldn't define a ZipWith out as In of another ZipWith. It only gets single event from one of the kafka topics then the rest of flow doesn't work and it doesn't get any other event. Single zipWith is working for 2 kafka datasources. Whenever I introduce the pmf and connect pm1 and pm2's outlets to inlet of pmf, it doesn't work :( Can you please help ?



Data is retrieved from Kafka topics, it works with datasources like Source(1 to 100). Kafka datasource has different behaviour all my tests are done with dummy datasources.



package sample

import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.Future

object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()

private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)


private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)

private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)


val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}

def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}

def printss(s:String): Unit ={
print(s)
}


val sinkFinal = Sink.foreach(printss)

val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)

s3 ~> pm2.in(0)
s4 ~> pm2.in(1)

pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)

pmf.out ~> sink.in

ClosedShape
})

val max: Future[Done] = g.run()

def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}









share|improve this question
















I couldn't define a ZipWith out as In of another ZipWith. It only gets single event from one of the kafka topics then the rest of flow doesn't work and it doesn't get any other event. Single zipWith is working for 2 kafka datasources. Whenever I introduce the pmf and connect pm1 and pm2's outlets to inlet of pmf, it doesn't work :( Can you please help ?



Data is retrieved from Kafka topics, it works with datasources like Source(1 to 100). Kafka datasource has different behaviour all my tests are done with dummy datasources.



package sample

import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.Future

object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()

private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)


private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)

private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)


val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}

def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}

def printss(s:String): Unit ={
print(s)
}


val sinkFinal = Sink.foreach(printss)

val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)

s3 ~> pm2.in(0)
s4 ~> pm2.in(1)

pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)

pmf.out ~> sink.in

ClosedShape
})

val max: Future[Done] = g.run()

def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}






scala apache-kafka akka akka-stream






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 2 at 21:31









PrakashG

450514




450514










asked Dec 28 '18 at 19:31









rernas35rernas35

63




63













  • I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.

    – gabrielgiussi
    Dec 29 '18 at 20:42











  • @gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks

    – rernas35
    Jan 7 at 8:08











  • What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.

    – gabrielgiussi
    Jan 8 at 17:57



















  • I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.

    – gabrielgiussi
    Dec 29 '18 at 20:42











  • @gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks

    – rernas35
    Jan 7 at 8:08











  • What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.

    – gabrielgiussi
    Jan 8 at 17:57

















I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.

– gabrielgiussi
Dec 29 '18 at 20:42





I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.

– gabrielgiussi
Dec 29 '18 at 20:42













@gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks

– rernas35
Jan 7 at 8:08





@gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks

– rernas35
Jan 7 at 8:08













What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.

– gabrielgiussi
Jan 8 at 17:57





What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.

– gabrielgiussi
Jan 8 at 17:57












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53963459%2fconnecting-zipwith-outlet-to-another-zipwith-as-inlet%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53963459%2fconnecting-zipwith-outlet-to-another-zipwith-as-inlet%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Monofisismo

Angular Downloading a file using contenturl with Basic Authentication

Olmecas