Fuzion Logo
fuzion-lang.dev — The Fuzion Language Portal
JavaScript seems to be disabled. Functionality is limited.

Stream.fz


# This file is part of the Fuzion language implementation.
#
# The Fuzion language implementation is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, version 3 of the License.
#
# The Fuzion language implementation is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License along with The
# Fuzion language implementation.  If not, see <https://www.gnu.org/licenses/>.


# -----------------------------------------------------------------------
#
#  Tokiwa Software GmbH, Germany
#
#  Source code of Fuzion standard library feature stream
#
#  Author: Fridtjof Siebert (siebert@tokiwa.software)
#
# -----------------------------------------------------------------------

# stream -- a stream of values
#
# A stream contains mutable state, so it cannot be reused or shared
# between threads.
#
# The mutable nature of streams requires particular prudence, as even basic
# actions, such as calling as_string on a stream will consume values and thus
# change the state of the stream, as the following example demonstrates:
#
#   a := [1, 2, 3, 4, 5].as_stream
#   say a
#   say a
#
# In this example, the first invocation of say will print "1, 2, 3, 4, 5",
# the second invocation will print "".
#
# NYI: Check if stream should be replaced by a lazy list, which is a choice
# of either nil or a tuple (head, tail). This should avoid the need to store
# mutable state.
public stream(public T type) ref : Sequence T is

  as_list0 option (list T) := nil

  # create a list from this stream
  #
  public redef as_list list T =>
    if !as_list0.exists
      set as_list0 := from_stream
    as_list0.get

  # this must not be called more than once!
  private from_stream list T =>
    if has_next
      h := next
      ref : Cons T (list T)
        memoized_tail option (list T) := nil
        head => h
        tail =>
          if !memoized_tail.exists
            set memoized_tail := from_stream
          memoized_tail.get
    else
      nil


  # take n items from stream, less if stream has fewer than n items
  #
  public redef take (n i32) =>
    m : mutate is
    m.go ()->
      for
        a := (mutate.array T).new m, {a.add x ; a}
        i in (1..n)
      while has_next
        x := next
      else
        (a.take i).as_list


  # Return this stream as a stream.
  #
  # This is a helper function that needs to be defined because stream is an heir
  # of Sequence.
  #
  public redef as_stream stream T => stream.this


  # apply f to all elements in this stream
  #
  public redef for_each(f T -> unit) unit => while has_next do f(next)


  # apply 'f' to each element 'e' as long as 'f e'
  #
  public redef for_while(f T -> bool) unit => while has_next && f(next)


  # check if predicate f holds for all elements produced by this stream
  #
  public redef infix ∀ (f T -> bool) bool =>
    while has_next: f next   # has_next implies f next
    until !has_next


  # check if predicate f holds for at least one element produced by this stream
  #
  public redef infix ∃ (f T -> bool) bool =>
    while has_next
    until f next


  # does this stream have one more element?
  #
  public has_next bool => abstract


  # the next element in this stream
  #
  public next T
  /* NYI: C backend creates broken code for calling precondition of abstract feature
    pre
      has_next
   */
  => abstract


  # get the next element or nil if !has_next
  #
  next_if_exists option T =>
    if has_next
      next
    else
      nil


  # print the elements of this stream
  #
  print unit =>
    for_each (x ->
      yak x
      if has_next then yak ", ")


  # count the elements of this stream
  #
  public redef count i32 =>
    # NYI: check if this works: (map i32 x->1).fold i32.sum
    as_list.map i32 x->1
          .fold i32.sum


  # collect all items from this stream into an array
  #
  public redef as_array array T =>
    m : mutate is
    m.go ()->
      a := (m.array T).new m
      while has_next
        a.add next
      a.as_array


  # create a stream that consists of all be the elements if this stream followed
  # by all the elements of s
  #
  Concat_Streams (s stream T) ref : stream T is
    has_next => stream.this.has_next || s.has_next
    next => if (stream.this.has_next) stream.this.next else s.next


  # create a string from the elements of this stream
  #
  redef as_string String =>
    map_to_stream String (x -> x.as_string)
      .fold (String.concat ", ")


  # create a string representation of this stream including all the string
  # representations of its contents, separated by 'sep'.
  #
  public redef as_string (sep String) => as_list.as_string sep


  # map the stream to a new stream applying function f to all elements
  #
  # This performs a lazy mapping, f is called only when the elements
  # are taken from the stream.
  #
  public map_to_stream(B type, f T -> B) stream B =>
    ref : stream B
      has_next => stream.this.has_next
      next B => f stream.this.next


  # NYI this was broken but is meanwhile fixed.
  # Should be removed eventually.
  #
  public map_broken(B type, f T -> B) : stream B is
    has_next => stream.this.has_next
    next B => f stream.this.next


  # fold the elements of this stream using the given monoid.
  #
  # e.g., to sum the elements of a stream of i32, use s.fold i32.sum
  #
  public redef fold (m Monoid T) => fold m.e m


  # fold the elements of this stream using the given monoid m and initial value s.
  #
  # e.g., to sum the elements of a stream of i32, use s.fold i32.sum
  #
  public fold (s T, m Monoid T) =>
    for
      r := s, m.op r next
    while has_next
    else
      r


# streams -- unit type defining features related
# to streams but not requiring an instance
#
public streams is

  # creates a - potentially infinite - stream by using given supplier
  # end of stream is reached once supplier returns nil
  # note that the supplier is called only once for each has_next/next cycle
  #
  public generate(T type, supplier () -> option T) stream T =>

    not_initialized is
    depleted is
    val := mut (choice T not_initialized depleted) not_initialized

    read =>
      match val.get
        not_initialized =>
          match supplier()
            t T => val <- t
            nil => val <- depleted
        * =>
      val.get

    reset =>
      val <- not_initialized

    ref: stream T
      redef has_next bool =>
        match read
          v T => true
          * => false
      redef next T =>
        match read
          v T => reset; v
          * => fuzion.std.panic "there is no next value available"