Unable to merge multiple streams into one












1















I am trying to merge content of multiple files into single file and to test my code,



I have 3 files in my test dir -
file1.txt : this
file2.txt : is
file3.txt : test



and here is the code to combine all 3 files as stream and write to single "out.txt" file.
but this code only write content of one file to "out.txt", why ?



**



import java.io.{File, FileInputStream, InputStream, SequenceInputStream}
import scala.collection.JavaConverters.asJavaEnumeration
def mergeInputStreams(files: Iterator[File]): InputStream = new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
val d = "/Users/pink/temp"
val file = new File(d);
//file.listFiles.iterator.foreach(println)
val fss = mergeInputStreams(file.listFiles.toIterator)
val outfile = new File("/Users/pink/tmp/out.txt")
val os = new FileOutputStream(outfile)
try {
while (fss.available()>0) {
os.write(fss.read())
}
} finally {
fss.close()
os.close()
}


**



I expect the above code should produce one file with following content -
out.txt:
this is test










share|improve this question



























    1















    I am trying to merge content of multiple files into single file and to test my code,



    I have 3 files in my test dir -
    file1.txt : this
    file2.txt : is
    file3.txt : test



    and here is the code to combine all 3 files as stream and write to single "out.txt" file.
    but this code only write content of one file to "out.txt", why ?



    **



    import java.io.{File, FileInputStream, InputStream, SequenceInputStream}
    import scala.collection.JavaConverters.asJavaEnumeration
    def mergeInputStreams(files: Iterator[File]): InputStream = new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
    val d = "/Users/pink/temp"
    val file = new File(d);
    //file.listFiles.iterator.foreach(println)
    val fss = mergeInputStreams(file.listFiles.toIterator)
    val outfile = new File("/Users/pink/tmp/out.txt")
    val os = new FileOutputStream(outfile)
    try {
    while (fss.available()>0) {
    os.write(fss.read())
    }
    } finally {
    fss.close()
    os.close()
    }


    **



    I expect the above code should produce one file with following content -
    out.txt:
    this is test










    share|improve this question

























      1












      1








      1








      I am trying to merge content of multiple files into single file and to test my code,



      I have 3 files in my test dir -
      file1.txt : this
      file2.txt : is
      file3.txt : test



      and here is the code to combine all 3 files as stream and write to single "out.txt" file.
      but this code only write content of one file to "out.txt", why ?



      **



      import java.io.{File, FileInputStream, InputStream, SequenceInputStream}
      import scala.collection.JavaConverters.asJavaEnumeration
      def mergeInputStreams(files: Iterator[File]): InputStream = new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
      val d = "/Users/pink/temp"
      val file = new File(d);
      //file.listFiles.iterator.foreach(println)
      val fss = mergeInputStreams(file.listFiles.toIterator)
      val outfile = new File("/Users/pink/tmp/out.txt")
      val os = new FileOutputStream(outfile)
      try {
      while (fss.available()>0) {
      os.write(fss.read())
      }
      } finally {
      fss.close()
      os.close()
      }


      **



      I expect the above code should produce one file with following content -
      out.txt:
      this is test










      share|improve this question














      I am trying to merge content of multiple files into single file and to test my code,



      I have 3 files in my test dir -
      file1.txt : this
      file2.txt : is
      file3.txt : test



      and here is the code to combine all 3 files as stream and write to single "out.txt" file.
      but this code only write content of one file to "out.txt", why ?



      **



      import java.io.{File, FileInputStream, InputStream, SequenceInputStream}
      import scala.collection.JavaConverters.asJavaEnumeration
      def mergeInputStreams(files: Iterator[File]): InputStream = new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
      val d = "/Users/pink/temp"
      val file = new File(d);
      //file.listFiles.iterator.foreach(println)
      val fss = mergeInputStreams(file.listFiles.toIterator)
      val outfile = new File("/Users/pink/tmp/out.txt")
      val os = new FileOutputStream(outfile)
      try {
      while (fss.available()>0) {
      os.write(fss.read())
      }
      } finally {
      fss.close()
      os.close()
      }


      **



      I expect the above code should produce one file with following content -
      out.txt:
      this is test







      scala






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Dec 28 '18 at 14:42









      AKSAKS

      414




      414
























          2 Answers
          2






          active

          oldest

          votes


















          1














          This behavior happens because fss.available() > 0 is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)




          Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.




          The fact that fss.available() > 0 guarantees that the stream is not finished but it is not true in the reverse direction: fss.available() might be 0 but you might still be able to read more data. This is potentially true even for file-based InputStreams. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.



          If you read the JavaDoc for SequenceInputStream.available() you may see




          ...

          This method simply calls available of the current underlying input stream and returns the result.




          And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0 because the end was reached and when just more waiting or a blocking operation is needed to get more data.



          The intent of the available method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream has reached the end is potentially blocking read() == -1.






          share|improve this answer


























          • Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }

            – AKS
            Jan 1 at 23:11













          • @AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other two read methods that use a buffer byte array or even the java.nio package (take a look at Pipe).

            – SergGr
            Jan 1 at 23:34





















          0














          mergeInputStreams is working correctly, but the while loop for writing the InputStream to the FileOutputStream isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:



          file.listFiles.toIterator


          once we sort the input files, we get this in out.txt:



          this
          is
          test


          Here's the code that does this:



          import scala.collection.JavaConverters.asJavaEnumeration
          import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
          import org.apache.commons.io.IOUtils

          object Example1 {
          def main(args: Array[String]): Unit = {
          def mergeInputStreams(files: Iterator[File]): InputStream =
          new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
          val d = "/Users/pink/temp"
          val file = new File(d)
          val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
          val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
          try {
          IOUtils.copy(fss, os)
          }
          catch {
          case e: IOException => println(s"IO exception: $e.getMessage")
          }
          }
          }


          This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.



          A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:



          import cats.effect.{ExitCode, IO, IOApp, Resource}
          import cats.implicits._
          import fs2.{io, text, Stream}
          import java.nio.file.Paths
          import java.util.concurrent.Executors
          import scala.concurrent.ExecutionContext

          object Example2 extends IOApp {
          private val blockingExecutionContext =
          Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))

          val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>

          val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
          val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
          val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)

          (f2 merge f3 merge f1)
          .through(text.utf8Decode)
          .through(text.lines)
          .intersperse("n")
          .through(text.utf8Encode)
          .through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
          }

          def run(args: List[String]): IO[ExitCode] =
          converter.compile.drain.as(ExitCode.Success)
          }


          No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.






          share|improve this answer























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53960254%2funable-to-merge-multiple-streams-into-one%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            This behavior happens because fss.available() > 0 is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)




            Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.




            The fact that fss.available() > 0 guarantees that the stream is not finished but it is not true in the reverse direction: fss.available() might be 0 but you might still be able to read more data. This is potentially true even for file-based InputStreams. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.



            If you read the JavaDoc for SequenceInputStream.available() you may see




            ...

            This method simply calls available of the current underlying input stream and returns the result.




            And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0 because the end was reached and when just more waiting or a blocking operation is needed to get more data.



            The intent of the available method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream has reached the end is potentially blocking read() == -1.






            share|improve this answer


























            • Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }

              – AKS
              Jan 1 at 23:11













            • @AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other two read methods that use a buffer byte array or even the java.nio package (take a look at Pipe).

              – SergGr
              Jan 1 at 23:34


















            1














            This behavior happens because fss.available() > 0 is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)




            Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.




            The fact that fss.available() > 0 guarantees that the stream is not finished but it is not true in the reverse direction: fss.available() might be 0 but you might still be able to read more data. This is potentially true even for file-based InputStreams. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.



            If you read the JavaDoc for SequenceInputStream.available() you may see




            ...

            This method simply calls available of the current underlying input stream and returns the result.




            And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0 because the end was reached and when just more waiting or a blocking operation is needed to get more data.



            The intent of the available method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream has reached the end is potentially blocking read() == -1.






            share|improve this answer


























            • Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }

              – AKS
              Jan 1 at 23:11













            • @AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other two read methods that use a buffer byte array or even the java.nio package (take a look at Pipe).

              – SergGr
              Jan 1 at 23:34
















            1












            1








            1







            This behavior happens because fss.available() > 0 is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)




            Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.




            The fact that fss.available() > 0 guarantees that the stream is not finished but it is not true in the reverse direction: fss.available() might be 0 but you might still be able to read more data. This is potentially true even for file-based InputStreams. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.



            If you read the JavaDoc for SequenceInputStream.available() you may see




            ...

            This method simply calls available of the current underlying input stream and returns the result.




            And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0 because the end was reached and when just more waiting or a blocking operation is needed to get more data.



            The intent of the available method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream has reached the end is potentially blocking read() == -1.






            share|improve this answer















            This behavior happens because fss.available() > 0 is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)




            Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.




            The fact that fss.available() > 0 guarantees that the stream is not finished but it is not true in the reverse direction: fss.available() might be 0 but you might still be able to read more data. This is potentially true even for file-based InputStreams. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.



            If you read the JavaDoc for SequenceInputStream.available() you may see




            ...

            This method simply calls available of the current underlying input stream and returns the result.




            And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0 because the end was reached and when just more waiting or a blocking operation is needed to get more data.



            The intent of the available method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream has reached the end is potentially blocking read() == -1.







            share|improve this answer














            share|improve this answer



            share|improve this answer








            edited Dec 28 '18 at 16:22

























            answered Dec 28 '18 at 16:15









            SergGrSergGr

            19.9k22243




            19.9k22243













            • Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }

              – AKS
              Jan 1 at 23:11













            • @AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other two read methods that use a buffer byte array or even the java.nio package (take a look at Pipe).

              – SergGr
              Jan 1 at 23:34





















            • Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }

              – AKS
              Jan 1 at 23:11













            • @AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other two read methods that use a buffer byte array or even the java.nio package (take a look at Pipe).

              – SergGr
              Jan 1 at 23:34



















            Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }

            – AKS
            Jan 1 at 23:11







            Thanks for pointing out issue with fss.available() > 0. I modified code using read() != -1 and it worked as expected. var c = 0 try { while ({ c = fss.read() c != -1 }) { os.write(c) } } finally { fss.close() os.close() }

            – AKS
            Jan 1 at 23:11















            @AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other two read methods that use a buffer byte array or even the java.nio package (take a look at Pipe).

            – SergGr
            Jan 1 at 23:34







            @AKS, please note that although this is technically a correct way, copying data 1 byte at a time is awfully inefficient. If you are going to handle big files, you should consider using one of the other two read methods that use a buffer byte array or even the java.nio package (take a look at Pipe).

            – SergGr
            Jan 1 at 23:34















            0














            mergeInputStreams is working correctly, but the while loop for writing the InputStream to the FileOutputStream isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:



            file.listFiles.toIterator


            once we sort the input files, we get this in out.txt:



            this
            is
            test


            Here's the code that does this:



            import scala.collection.JavaConverters.asJavaEnumeration
            import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
            import org.apache.commons.io.IOUtils

            object Example1 {
            def main(args: Array[String]): Unit = {
            def mergeInputStreams(files: Iterator[File]): InputStream =
            new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
            val d = "/Users/pink/temp"
            val file = new File(d)
            val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
            val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
            try {
            IOUtils.copy(fss, os)
            }
            catch {
            case e: IOException => println(s"IO exception: $e.getMessage")
            }
            }
            }


            This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.



            A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:



            import cats.effect.{ExitCode, IO, IOApp, Resource}
            import cats.implicits._
            import fs2.{io, text, Stream}
            import java.nio.file.Paths
            import java.util.concurrent.Executors
            import scala.concurrent.ExecutionContext

            object Example2 extends IOApp {
            private val blockingExecutionContext =
            Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))

            val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>

            val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
            val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
            val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)

            (f2 merge f3 merge f1)
            .through(text.utf8Decode)
            .through(text.lines)
            .intersperse("n")
            .through(text.utf8Encode)
            .through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
            }

            def run(args: List[String]): IO[ExitCode] =
            converter.compile.drain.as(ExitCode.Success)
            }


            No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.






            share|improve this answer




























              0














              mergeInputStreams is working correctly, but the while loop for writing the InputStream to the FileOutputStream isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:



              file.listFiles.toIterator


              once we sort the input files, we get this in out.txt:



              this
              is
              test


              Here's the code that does this:



              import scala.collection.JavaConverters.asJavaEnumeration
              import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
              import org.apache.commons.io.IOUtils

              object Example1 {
              def main(args: Array[String]): Unit = {
              def mergeInputStreams(files: Iterator[File]): InputStream =
              new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
              val d = "/Users/pink/temp"
              val file = new File(d)
              val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
              val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
              try {
              IOUtils.copy(fss, os)
              }
              catch {
              case e: IOException => println(s"IO exception: $e.getMessage")
              }
              }
              }


              This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.



              A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:



              import cats.effect.{ExitCode, IO, IOApp, Resource}
              import cats.implicits._
              import fs2.{io, text, Stream}
              import java.nio.file.Paths
              import java.util.concurrent.Executors
              import scala.concurrent.ExecutionContext

              object Example2 extends IOApp {
              private val blockingExecutionContext =
              Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))

              val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>

              val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
              val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
              val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)

              (f2 merge f3 merge f1)
              .through(text.utf8Decode)
              .through(text.lines)
              .intersperse("n")
              .through(text.utf8Encode)
              .through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
              }

              def run(args: List[String]): IO[ExitCode] =
              converter.compile.drain.as(ExitCode.Success)
              }


              No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.






              share|improve this answer


























                0












                0








                0







                mergeInputStreams is working correctly, but the while loop for writing the InputStream to the FileOutputStream isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:



                file.listFiles.toIterator


                once we sort the input files, we get this in out.txt:



                this
                is
                test


                Here's the code that does this:



                import scala.collection.JavaConverters.asJavaEnumeration
                import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
                import org.apache.commons.io.IOUtils

                object Example1 {
                def main(args: Array[String]): Unit = {
                def mergeInputStreams(files: Iterator[File]): InputStream =
                new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
                val d = "/Users/pink/temp"
                val file = new File(d)
                val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
                val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
                try {
                IOUtils.copy(fss, os)
                }
                catch {
                case e: IOException => println(s"IO exception: $e.getMessage")
                }
                }
                }


                This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.



                A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:



                import cats.effect.{ExitCode, IO, IOApp, Resource}
                import cats.implicits._
                import fs2.{io, text, Stream}
                import java.nio.file.Paths
                import java.util.concurrent.Executors
                import scala.concurrent.ExecutionContext

                object Example2 extends IOApp {
                private val blockingExecutionContext =
                Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))

                val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>

                val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
                val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
                val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)

                (f2 merge f3 merge f1)
                .through(text.utf8Decode)
                .through(text.lines)
                .intersperse("n")
                .through(text.utf8Encode)
                .through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
                }

                def run(args: List[String]): IO[ExitCode] =
                converter.compile.drain.as(ExitCode.Success)
                }


                No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.






                share|improve this answer













                mergeInputStreams is working correctly, but the while loop for writing the InputStream to the FileOutputStream isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:



                file.listFiles.toIterator


                once we sort the input files, we get this in out.txt:



                this
                is
                test


                Here's the code that does this:



                import scala.collection.JavaConverters.asJavaEnumeration
                import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
                import org.apache.commons.io.IOUtils

                object Example1 {
                def main(args: Array[String]): Unit = {
                def mergeInputStreams(files: Iterator[File]): InputStream =
                new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
                val d = "/Users/pink/temp"
                val file = new File(d)
                val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
                val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
                try {
                IOUtils.copy(fss, os)
                }
                catch {
                case e: IOException => println(s"IO exception: $e.getMessage")
                }
                }
                }


                This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.



                A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:



                import cats.effect.{ExitCode, IO, IOApp, Resource}
                import cats.implicits._
                import fs2.{io, text, Stream}
                import java.nio.file.Paths
                import java.util.concurrent.Executors
                import scala.concurrent.ExecutionContext

                object Example2 extends IOApp {
                private val blockingExecutionContext =
                Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))

                val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>

                val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
                val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
                val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)

                (f2 merge f3 merge f1)
                .through(text.utf8Decode)
                .through(text.lines)
                .intersperse("n")
                .through(text.utf8Encode)
                .through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
                }

                def run(args: List[String]): IO[ExitCode] =
                converter.compile.drain.as(ExitCode.Success)
                }


                No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Dec 28 '18 at 16:37









                codenoodlecodenoodle

                377110




                377110






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53960254%2funable-to-merge-multiple-streams-into-one%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Monofisismo

                    Angular Downloading a file using contenturl with Basic Authentication

                    Olmecas