Connecting ZipWith Outlet to another ZipWith as Inlet
I couldn't define a ZipWith out
as In
of another ZipWith
. It only gets single event from one of the kafka topics
then the rest of flow doesn't work and it doesn't get any other event. Single zipWith
is working for 2 kafka
datasources
. Whenever I introduce the pmf
and connect pm1
and pm2's
outlets to inlet of pmf
, it doesn't work :( Can you please help ?
Data is retrieved from Kafka topics
, it works with datasources
like Source(1 to 100). Kafka datasource
has different behaviour all my tests are done with dummy datasources
.
package sample
import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()
private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)
private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)
private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)
val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}
def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}
def printss(s:String): Unit ={
print(s)
}
val sinkFinal = Sink.foreach(printss)
val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)
s3 ~> pm2.in(0)
s4 ~> pm2.in(1)
pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)
pmf.out ~> sink.in
ClosedShape
})
val max: Future[Done] = g.run()
def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}
scala apache-kafka akka akka-stream
add a comment |
I couldn't define a ZipWith out
as In
of another ZipWith
. It only gets single event from one of the kafka topics
then the rest of flow doesn't work and it doesn't get any other event. Single zipWith
is working for 2 kafka
datasources
. Whenever I introduce the pmf
and connect pm1
and pm2's
outlets to inlet of pmf
, it doesn't work :( Can you please help ?
Data is retrieved from Kafka topics
, it works with datasources
like Source(1 to 100). Kafka datasource
has different behaviour all my tests are done with dummy datasources
.
package sample
import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()
private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)
private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)
private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)
val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}
def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}
def printss(s:String): Unit ={
print(s)
}
val sinkFinal = Sink.foreach(printss)
val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)
s3 ~> pm2.in(0)
s4 ~> pm2.in(1)
pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)
pmf.out ~> sink.in
ClosedShape
})
val max: Future[Done] = g.run()
def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}
scala apache-kafka akka akka-stream
I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.
– gabrielgiussi
Dec 29 '18 at 20:42
@gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks
– rernas35
Jan 7 at 8:08
What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.
– gabrielgiussi
Jan 8 at 17:57
add a comment |
I couldn't define a ZipWith out
as In
of another ZipWith
. It only gets single event from one of the kafka topics
then the rest of flow doesn't work and it doesn't get any other event. Single zipWith
is working for 2 kafka
datasources
. Whenever I introduce the pmf
and connect pm1
and pm2's
outlets to inlet of pmf
, it doesn't work :( Can you please help ?
Data is retrieved from Kafka topics
, it works with datasources
like Source(1 to 100). Kafka datasource
has different behaviour all my tests are done with dummy datasources
.
package sample
import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()
private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)
private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)
private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)
val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}
def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}
def printss(s:String): Unit ={
print(s)
}
val sinkFinal = Sink.foreach(printss)
val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)
s3 ~> pm2.in(0)
s4 ~> pm2.in(1)
pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)
pmf.out ~> sink.in
ClosedShape
})
val max: Future[Done] = g.run()
def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}
scala apache-kafka akka akka-stream
I couldn't define a ZipWith out
as In
of another ZipWith
. It only gets single event from one of the kafka topics
then the rest of flow doesn't work and it doesn't get any other event. Single zipWith
is working for 2 kafka
datasources
. Whenever I introduce the pmf
and connect pm1
and pm2's
outlets to inlet of pmf
, it doesn't work :( Can you please help ?
Data is retrieved from Kafka topics
, it works with datasources
like Source(1 to 100). Kafka datasource
has different behaviour all my tests are done with dummy datasources
.
package sample
import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()
private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)
private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)
private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)
val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}
def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}
def printss(s:String): Unit ={
print(s)
}
val sinkFinal = Sink.foreach(printss)
val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)
s3 ~> pm2.in(0)
s4 ~> pm2.in(1)
pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)
pmf.out ~> sink.in
ClosedShape
})
val max: Future[Done] = g.run()
def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}
scala apache-kafka akka akka-stream
scala apache-kafka akka akka-stream
edited Jan 2 at 21:31
PrakashG
450514
450514
asked Dec 28 '18 at 19:31
rernas35rernas35
63
63
I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.
– gabrielgiussi
Dec 29 '18 at 20:42
@gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks
– rernas35
Jan 7 at 8:08
What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.
– gabrielgiussi
Jan 8 at 17:57
add a comment |
I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.
– gabrielgiussi
Dec 29 '18 at 20:42
@gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks
– rernas35
Jan 7 at 8:08
What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.
– gabrielgiussi
Jan 8 at 17:57
I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.
– gabrielgiussi
Dec 29 '18 at 20:42
I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.
– gabrielgiussi
Dec 29 '18 at 20:42
@gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks
– rernas35
Jan 7 at 8:08
@gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks
– rernas35
Jan 7 at 8:08
What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.
– gabrielgiussi
Jan 8 at 17:57
What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.
– gabrielgiussi
Jan 8 at 17:57
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53963459%2fconnecting-zipwith-outlet-to-another-zipwith-as-inlet%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53963459%2fconnecting-zipwith-outlet-to-another-zipwith-as-inlet%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
I've tried your code and it works as I expected... e.g. topic1 produces a, topic2 b, topic3 c and topic4 e and I get a _ b _ c _ d.
– gabrielgiussi
Dec 29 '18 at 20:42
@gabrielgiussi thanks for your reply . is it possible to share the project with me. Because my sample doesn't work for me. It grabs some of records from kafka and doesn't process them. I am curious about your versions and configuration on your application side and broker side. Thanks
– rernas35
Jan 7 at 8:08
What is the expected behavior and what behavior are you seeing? Please explain with a concrete example.
– gabrielgiussi
Jan 8 at 17:57