Unable to merge multiple streams into one
I am trying to merge content of multiple files into single file and to test my code,
I have 3 files in my test dir -
file1.txt : this
file2.txt : is
file3.txt : test
and here is the code to combine all 3 files as stream and write to single "out.txt" file.
but this code only write content of one file to "out.txt", why ?
**
import java.io.{File, FileInputStream, InputStream, SequenceInputStream}
import scala.collection.JavaConverters.asJavaEnumeration
def mergeInputStreams(files: Iterator[File]): InputStream = new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
val d = "/Users/pink/temp"
val file = new File(d);
//file.listFiles.iterator.foreach(println)
val fss = mergeInputStreams(file.listFiles.toIterator)
val outfile = new File("/Users/pink/tmp/out.txt")
val os = new FileOutputStream(outfile)
try {
while (fss.available()>0) {
os.write(fss.read())
}
} finally {
fss.close()
os.close()
}
**
I expect the above code should produce one file with following content -
out.txt:
this is test
scala
add a comment |
I am trying to merge content of multiple files into single file and to test my code,
I have 3 files in my test dir -
file1.txt : this
file2.txt : is
file3.txt : test
and here is the code to combine all 3 files as stream and write to single "out.txt" file.
but this code only write content of one file to "out.txt", why ?
**
import java.io.{File, FileInputStream, InputStream, SequenceInputStream}
import scala.collection.JavaConverters.asJavaEnumeration
def mergeInputStreams(files: Iterator[File]): InputStream = new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
val d = "/Users/pink/temp"
val file = new File(d);
//file.listFiles.iterator.foreach(println)
val fss = mergeInputStreams(file.listFiles.toIterator)
val outfile = new File("/Users/pink/tmp/out.txt")
val os = new FileOutputStream(outfile)
try {
while (fss.available()>0) {
os.write(fss.read())
}
} finally {
fss.close()
os.close()
}
**
I expect the above code should produce one file with following content -
out.txt:
this is test
scala
add a comment |
I am trying to merge content of multiple files into single file and to test my code,
I have 3 files in my test dir -
file1.txt : this
file2.txt : is
file3.txt : test
and here is the code to combine all 3 files as stream and write to single "out.txt" file.
but this code only write content of one file to "out.txt", why ?
**
import java.io.{File, FileInputStream, InputStream, SequenceInputStream}
import scala.collection.JavaConverters.asJavaEnumeration
def mergeInputStreams(files: Iterator[File]): InputStream = new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
val d = "/Users/pink/temp"
val file = new File(d);
//file.listFiles.iterator.foreach(println)
val fss = mergeInputStreams(file.listFiles.toIterator)
val outfile = new File("/Users/pink/tmp/out.txt")
val os = new FileOutputStream(outfile)
try {
while (fss.available()>0) {
os.write(fss.read())
}
} finally {
fss.close()
os.close()
}
**
I expect the above code should produce one file with following content -
out.txt:
this is test
scala
I am trying to merge content of multiple files into single file and to test my code,
I have 3 files in my test dir -
file1.txt : this
file2.txt : is
file3.txt : test
and here is the code to combine all 3 files as stream and write to single "out.txt" file.
but this code only write content of one file to "out.txt", why ?
**
import java.io.{File, FileInputStream, InputStream, SequenceInputStream}
import scala.collection.JavaConverters.asJavaEnumeration
def mergeInputStreams(files: Iterator[File]): InputStream = new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
val d = "/Users/pink/temp"
val file = new File(d);
//file.listFiles.iterator.foreach(println)
val fss = mergeInputStreams(file.listFiles.toIterator)
val outfile = new File("/Users/pink/tmp/out.txt")
val os = new FileOutputStream(outfile)
try {
while (fss.available()>0) {
os.write(fss.read())
}
} finally {
fss.close()
os.close()
}
**
I expect the above code should produce one file with following content -
out.txt:
this is test
scala
scala
asked Dec 28 '18 at 14:42
AKSAKS
414
414
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
This behavior happens because fss.available() > 0
is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)
Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.
The fact that fss.available() > 0
guarantees that the stream is not finished but it is not true in the reverse direction: fss.available()
might be 0
but you might still be able to read more data. This is potentially true even for file-based InputStream
s. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available
that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.
If you read the JavaDoc for SequenceInputStream.available() you may see
...
This method simply callsavailable
of the current underlying input stream and returns the result.
And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0
because the end was reached and when just more waiting or a blocking operation is needed to get more data.
The intent of the available
method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream
has reached the end is potentially blocking read() == -1
.
Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }
– AKS
Jan 1 at 23:11
@AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other tworead
methods that use a bufferbyte
array or even thejava.nio
package (take a look at Pipe).
– SergGr
Jan 1 at 23:34
add a comment |
mergeInputStreams
is working correctly, but the while loop for writing the InputStream
to the FileOutputStream
isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:
file.listFiles.toIterator
once we sort the input files, we get this in out.txt:
this
is
test
Here's the code that does this:
import scala.collection.JavaConverters.asJavaEnumeration
import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
import org.apache.commons.io.IOUtils
object Example1 {
def main(args: Array[String]): Unit = {
def mergeInputStreams(files: Iterator[File]): InputStream =
new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
val d = "/Users/pink/temp"
val file = new File(d)
val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
try {
IOUtils.copy(fss, os)
}
catch {
case e: IOException => println(s"IO exception: $e.getMessage")
}
}
}
This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.
A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2.{io, text, Stream}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
object Example2 extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))
val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)
(f2 merge f3 merge f1)
.through(text.utf8Decode)
.through(text.lines)
.intersperse("n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
}
def run(args: List[String]): IO[ExitCode] =
converter.compile.drain.as(ExitCode.Success)
}
No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53960254%2funable-to-merge-multiple-streams-into-one%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
This behavior happens because fss.available() > 0
is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)
Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.
The fact that fss.available() > 0
guarantees that the stream is not finished but it is not true in the reverse direction: fss.available()
might be 0
but you might still be able to read more data. This is potentially true even for file-based InputStream
s. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available
that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.
If you read the JavaDoc for SequenceInputStream.available() you may see
...
This method simply callsavailable
of the current underlying input stream and returns the result.
And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0
because the end was reached and when just more waiting or a blocking operation is needed to get more data.
The intent of the available
method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream
has reached the end is potentially blocking read() == -1
.
Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }
– AKS
Jan 1 at 23:11
@AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other tworead
methods that use a bufferbyte
array or even thejava.nio
package (take a look at Pipe).
– SergGr
Jan 1 at 23:34
add a comment |
This behavior happens because fss.available() > 0
is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)
Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.
The fact that fss.available() > 0
guarantees that the stream is not finished but it is not true in the reverse direction: fss.available()
might be 0
but you might still be able to read more data. This is potentially true even for file-based InputStream
s. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available
that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.
If you read the JavaDoc for SequenceInputStream.available() you may see
...
This method simply callsavailable
of the current underlying input stream and returns the result.
And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0
because the end was reached and when just more waiting or a blocking operation is needed to get more data.
The intent of the available
method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream
has reached the end is potentially blocking read() == -1
.
Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }
– AKS
Jan 1 at 23:11
@AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other tworead
methods that use a bufferbyte
array or even thejava.nio
package (take a look at Pipe).
– SergGr
Jan 1 at 23:34
add a comment |
This behavior happens because fss.available() > 0
is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)
Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.
The fact that fss.available() > 0
guarantees that the stream is not finished but it is not true in the reverse direction: fss.available()
might be 0
but you might still be able to read more data. This is potentially true even for file-based InputStream
s. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available
that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.
If you read the JavaDoc for SequenceInputStream.available() you may see
...
This method simply callsavailable
of the current underlying input stream and returns the result.
And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0
because the end was reached and when just more waiting or a blocking operation is needed to get more data.
The intent of the available
method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream
has reached the end is potentially blocking read() == -1
.
This behavior happens because fss.available() > 0
is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)
Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.
The fact that fss.available() > 0
guarantees that the stream is not finished but it is not true in the reverse direction: fss.available()
might be 0
but you might still be able to read more data. This is potentially true even for file-based InputStream
s. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available
that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.
If you read the JavaDoc for SequenceInputStream.available() you may see
...
This method simply callsavailable
of the current underlying input stream and returns the result.
And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0
because the end was reached and when just more waiting or a blocking operation is needed to get more data.
The intent of the available
method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream
has reached the end is potentially blocking read() == -1
.
edited Dec 28 '18 at 16:22
answered Dec 28 '18 at 16:15
SergGrSergGr
19.9k22243
19.9k22243
Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }
– AKS
Jan 1 at 23:11
@AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other tworead
methods that use a bufferbyte
array or even thejava.nio
package (take a look at Pipe).
– SergGr
Jan 1 at 23:34
add a comment |
Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }
– AKS
Jan 1 at 23:11
@AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other tworead
methods that use a bufferbyte
array or even thejava.nio
package (take a look at Pipe).
– SergGr
Jan 1 at 23:34
Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }
– AKS
Jan 1 at 23:11
Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }
– AKS
Jan 1 at 23:11
@AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other two
read
methods that use a buffer byte
array or even the java.nio
package (take a look at Pipe).– SergGr
Jan 1 at 23:34
@AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other two
read
methods that use a buffer byte
array or even the java.nio
package (take a look at Pipe).– SergGr
Jan 1 at 23:34
add a comment |
mergeInputStreams
is working correctly, but the while loop for writing the InputStream
to the FileOutputStream
isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:
file.listFiles.toIterator
once we sort the input files, we get this in out.txt:
this
is
test
Here's the code that does this:
import scala.collection.JavaConverters.asJavaEnumeration
import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
import org.apache.commons.io.IOUtils
object Example1 {
def main(args: Array[String]): Unit = {
def mergeInputStreams(files: Iterator[File]): InputStream =
new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
val d = "/Users/pink/temp"
val file = new File(d)
val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
try {
IOUtils.copy(fss, os)
}
catch {
case e: IOException => println(s"IO exception: $e.getMessage")
}
}
}
This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.
A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2.{io, text, Stream}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
object Example2 extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))
val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)
(f2 merge f3 merge f1)
.through(text.utf8Decode)
.through(text.lines)
.intersperse("n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
}
def run(args: List[String]): IO[ExitCode] =
converter.compile.drain.as(ExitCode.Success)
}
No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.
add a comment |
mergeInputStreams
is working correctly, but the while loop for writing the InputStream
to the FileOutputStream
isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:
file.listFiles.toIterator
once we sort the input files, we get this in out.txt:
this
is
test
Here's the code that does this:
import scala.collection.JavaConverters.asJavaEnumeration
import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
import org.apache.commons.io.IOUtils
object Example1 {
def main(args: Array[String]): Unit = {
def mergeInputStreams(files: Iterator[File]): InputStream =
new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
val d = "/Users/pink/temp"
val file = new File(d)
val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
try {
IOUtils.copy(fss, os)
}
catch {
case e: IOException => println(s"IO exception: $e.getMessage")
}
}
}
This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.
A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2.{io, text, Stream}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
object Example2 extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))
val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)
(f2 merge f3 merge f1)
.through(text.utf8Decode)
.through(text.lines)
.intersperse("n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
}
def run(args: List[String]): IO[ExitCode] =
converter.compile.drain.as(ExitCode.Success)
}
No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.
add a comment |
mergeInputStreams
is working correctly, but the while loop for writing the InputStream
to the FileOutputStream
isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:
file.listFiles.toIterator
once we sort the input files, we get this in out.txt:
this
is
test
Here's the code that does this:
import scala.collection.JavaConverters.asJavaEnumeration
import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
import org.apache.commons.io.IOUtils
object Example1 {
def main(args: Array[String]): Unit = {
def mergeInputStreams(files: Iterator[File]): InputStream =
new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
val d = "/Users/pink/temp"
val file = new File(d)
val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
try {
IOUtils.copy(fss, os)
}
catch {
case e: IOException => println(s"IO exception: $e.getMessage")
}
}
}
This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.
A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2.{io, text, Stream}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
object Example2 extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))
val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)
(f2 merge f3 merge f1)
.through(text.utf8Decode)
.through(text.lines)
.intersperse("n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
}
def run(args: List[String]): IO[ExitCode] =
converter.compile.drain.as(ExitCode.Success)
}
No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.
mergeInputStreams
is working correctly, but the while loop for writing the InputStream
to the FileOutputStream
isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:
file.listFiles.toIterator
once we sort the input files, we get this in out.txt:
this
is
test
Here's the code that does this:
import scala.collection.JavaConverters.asJavaEnumeration
import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
import org.apache.commons.io.IOUtils
object Example1 {
def main(args: Array[String]): Unit = {
def mergeInputStreams(files: Iterator[File]): InputStream =
new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
val d = "/Users/pink/temp"
val file = new File(d)
val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
try {
IOUtils.copy(fss, os)
}
catch {
case e: IOException => println(s"IO exception: $e.getMessage")
}
}
}
This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.
A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2.{io, text, Stream}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
object Example2 extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))
val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)
(f2 merge f3 merge f1)
.through(text.utf8Decode)
.through(text.lines)
.intersperse("n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
}
def run(args: List[String]): IO[ExitCode] =
converter.compile.drain.as(ExitCode.Success)
}
No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.
answered Dec 28 '18 at 16:37
codenoodlecodenoodle
377110
377110
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53960254%2funable-to-merge-multiple-streams-into-one%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown