State management in SmartGraph is implemented at the component level using ReactiveX’s BehaviorSubject. This approach allows for reactive and efficient state handling within each ReactiveComponent. Let’s explore how this works in detail.

ReactiveComponent State Management

In SmartGraph, each ReactiveComponent can manage its own internal state using BehaviorSubjects. Here’s how it works:

from reactivex.subject import BehaviorSubject

class ReactiveComponent:
    def __init__(self, name: str):
        self.name = name
        self._states: Dict[str, BehaviorSubject] = {}
        # ... other initializations ...

    def create_state(self, key: str, initial_value: Any) -> BehaviorSubject:
        if key not in self._states:
            self._states[key] = BehaviorSubject(initial_value)
        return self._states[key]

    def get_state(self, key: str) -> Optional[BehaviorSubject]:
        return self._states.get(key)

    def update_state(self, key: str, value: Any) -> None:
        if key in self._states:
            self._states[key].on_next(value)

Let’s break down these methods:

  1. create_state: This method creates a new BehaviorSubject with an initial value if it doesn’t already exist.
  2. get_state: Retrieves the BehaviorSubject for a given state key.
  3. update_state: Updates the value of an existing state by emitting a new value to its BehaviorSubject.

Using State in a Component

Here’s an example of how you might use state within a custom component:

class CounterComponent(ReactiveComponent):
    def __init__(self, name: str):
        super().__init__(name)
        self.create_state("count", 0)

    async def process(self, input_data: Any) -> Any:
        count_subject = self.get_state("count")
        current_count = count_subject.value
        new_count = current_count + 1
        self.update_state("count", new_count)
        return f"Count is now {new_count}"

In this example, CounterComponent maintains an internal count state. Each time process is called, it increments the count and returns the new value.

Reactive State Handling

The use of BehaviorSubject for state management allows for reactive programming patterns. You can subscribe to state changes and react accordingly:

class ReactiveCounterComponent(ReactiveComponent):
    def __init__(self, name: str):
        super().__init__(name)
        count_subject = self.create_state("count", 0)
        count_subject.subscribe(lambda x: print(f"Count changed to: {x}"))

    async def process(self, input_data: Any) -> Any:
        current_count = self.get_state("count").value
        new_count = current_count + 1
        self.update_state("count", new_count)
        return new_count

In this component, we subscribe to the count state, so any changes to the count will be printed automatically.

State in the Pipeline Context

While state is managed at the component level, you can use it effectively within a pipeline:

from smartgraph import ReactiveSmartGraph, Pipeline

graph = ReactiveSmartGraph()
pipeline = graph.create_pipeline("StatefulPipeline")

counter = ReactiveCounterComponent("Counter")
pipeline.add_component(counter)

# More components can be added here

graph.compile()

# When executing, the state will be maintained across calls
result1 = await graph.execute_and_await("StatefulPipeline", None)  # Returns 1
result2 = await graph.execute_and_await("StatefulPipeline", None)  # Returns 2

In this example, the Counter component maintains its state across multiple executions of the pipeline.

Comprehensive State Management Example

Let’s create a simple language learning application that uses state management to track a user’s progress, adapt difficulty, and provide personalized feedback. This example will demonstrate how to use state within components and across a pipeline.

import asyncio
import random
from typing import List, Dict, Any
from smartgraph import ReactiveSmartGraph, ReactiveComponent

class VocabularyBank(ReactiveComponent):
    def __init__(self, name: str):
        super().__init__(name)
        self.create_state("words", {
            "easy": ["cat", "dog", "house", "tree", "book"],
            "medium": ["mountain", "computer", "elephant", "birthday", "adventure"],
            "hard": ["serendipity", "eloquent", "ephemeral", "ubiquitous", "paradigm"]
        })

    async def process(self, difficulty: str) -> str:
        words = self.get_state("words").value
        return random.choice(words[difficulty])

class DifficultyManager(ReactiveComponent):
    def __init__(self, name: str):
        super().__init__(name)
        self.create_state("current_difficulty", "easy")
        self.create_state("correct_streak", 0)

    async def process(self, correct: bool) -> str:
        current_difficulty = self.get_state("current_difficulty").value
        streak = self.get_state("correct_streak").value

        if correct:
            streak += 1
            if streak >= 3 and current_difficulty != "hard":
                current_difficulty = "medium" if current_difficulty == "easy" else "hard"
                streak = 0
        else:
            streak = 0
            if current_difficulty != "easy":
                current_difficulty = "easy" if current_difficulty == "hard" else "medium"

        self.update_state("current_difficulty", current_difficulty)
        self.update_state("correct_streak", streak)
        return current_difficulty

class UserProgressTracker(ReactiveComponent):
    def __init__(self, name: str):
        super().__init__(name)
        self.create_state("total_words", 0)
        self.create_state("correct_words", 0)

    async def process(self, correct: bool) -> Dict[str, Any]:
        total = self.get_state("total_words").value + 1
        correct_count = self.get_state("correct_words").value + (1 if correct else 0)

        self.update_state("total_words", total)
        self.update_state("correct_words", correct_count)

        return {
            "total": total,
            "correct": correct_count,
            "accuracy": (correct_count / total) * 100 if total > 0 else 0
        }

class LanguageLearningApp(ReactiveSmartGraph):
    def __init__(self):
        super().__init__()
        pipeline = self.create_pipeline("LanguageLearning")

        pipeline.add_component(VocabularyBank("Vocabulary"))
        pipeline.add_component(DifficultyManager("Difficulty"))
        pipeline.add_component(UserProgressTracker("Progress"))

        self.compile()

    async def get_word(self) -> str:
        difficulty = await self.execute_and_await("LanguageLearning", "easy")  # Start with easy
        return await self.execute_and_await("LanguageLearning", difficulty)

    async def process_answer(self, correct: bool) -> Dict[str, Any]:
        new_difficulty = await self.execute_and_await("LanguageLearning", correct)
        progress = await self.execute_and_await("LanguageLearning", correct)
        return {"difficulty": new_difficulty, "progress": progress}

async def main():
    app = LanguageLearningApp()

    for _ in range(10):  # Simulate 10 rounds
        word = await app.get_word()
        print(f"\nTranslate this word: {word}")
        user_input = input("Your translation: ")

        # Simulate checking the answer (in a real app, you'd have actual translation checking)
        correct = random.choice([True, False])
        print("Correct!" if correct else "Incorrect.")

        result = await app.process_answer(correct)
        print(f"New difficulty: {result['difficulty']}")
        print(f"Progress: {result['progress']['correct']}/{result['progress']['total']} " \
              f"({result['progress']['accuracy']:.2f}% accuracy)")

if __name__ == "__main__":
    asyncio.run(main())

This example demonstrates several key aspects of state management in SmartGraph:

  1. Component-Level State: Each component (VocabularyBank, DifficultyManager, and UserProgressTracker) maintains its own internal state using create_state, get_state, and update_state.

  2. Reactive State Handling: The DifficultyManager reacts to correct/incorrect answers by updating the difficulty level and streak count.

  3. State Persistence Across Calls: The UserProgressTracker maintains a running total of words and correct answers across multiple executions.

  4. State in Pipeline Context: The LanguageLearningApp coordinates the flow of state information between components in the pipeline.

  5. State-Driven Logic: The difficulty of words and the user’s progress are determined by the current state of the application.

When you run this application, you’ll see how the state changes affect the behavior of the system:

  • The difficulty of words will increase or decrease based on the user’s performance.
  • The application keeps track of the user’s overall progress and accuracy.
  • Each component maintains its own state, but they work together to create a cohesive learning experience.

This example showcases how SmartGraph’s state management capabilities can be used to create dynamic, adaptive applications with complex internal logic spread across multiple components.

Conclusion

SmartGraph’s state management system, built on ReactiveX’s BehaviorSubject, provides a powerful and flexible way to handle state within components. This approach allows for reactive, state-driven computations while keeping state encapsulated within individual components.

By leveraging this system, you can build complex, stateful pipelines where each component can maintain and react to its own state changes, leading to more modular and maintainable AI applications.

Next Steps

Now that you understand how to manage state in SmartGraph components, let’s explore how to add persistent storage Using Memory Toolkits.