.. _streaming-data-guide: .. role:: raw-html(raw) :format: html Streaming Data Hands-On Guide ============================= This guide covers EIR's streaming data functionality, which allows for real-time data streaming during training. The guide focuses on how to implement a compatible WebSocket server that can stream data to EIR. Overview -------- EIR includes built-in support for receiving streaming data via WebSocket connections. To use this functionality, you only need to: 1. Implement a WebSocket server that follows EIR's protocol specification 2. Point to your server's WebSocket address in EIR's configuration files For example, to use streaming data in EIR, you would simply specify the WebSocket URL in your configuration: .. code-block:: yaml output_info: output_source: ws://localhost:8000/ws output_name: text_output output_type: sequence EIR will automatically handle the connection, data receiving, and processing. Protocol Specification ---------------------- To be compatible with EIR, your WebSocket server must implement the following protocol: Message Structure ^^^^^^^^^^^^^^^^^ All messages use JSON format: .. code-block:: python { "type": str, # Message type "payload": Any # Message payload } Servers and the EIR client ^^^^^^^^^^^^^^^^^^^^^^^^^^ Before going further, it's useful to further define some terminology: - **Client (EIR)**: The EIR client that connects to the server and processes the streamed data. - *This is built into EIR and not modified by you.* - **Server**: The WebSocket server that streams data to EIR. - *This is what you implement and customize.* .. image:: static/img/eir-server-communication.svg :alt: EIR Server Communication So, while EIR handles the client side, we have to make sure our server implements the correct protocol to communicate with EIR. Core Protocol Messages ^^^^^^^^^^^^^^^^^^^^^^ Your server should be prepared to handle several message types from the EIR client. At a high level, these are the key interactions: - **Handshake & Keep-Alive**: - ``handshake``: The very first message sent by the client to establish a compatible connection. Your server must respond in kind. - ``heartbeat``: A simple message to keep the connection alive and check for responsiveness. - **Data & Schema Exchange**: - ``getInfo``: The client asks for the structure of your data (e.g., input/output names and types). Your server replies with an ``info`` message. - ``getData``: The client requests a batch of data samples. Your server replies with a ``data`` message containing the samples. - **State Management**: - ``status``: The client can ask for the current state of your data stream (e.g., how many samples have been sent). - ``reset``: The client can instruct the server to reset its data stream to the beginning. This is crucial for allowing EIR to read the stream multiple times (e.g., once for setup and once for training). Logic Flow When Streaming ------------------------- When we run ``eirtrain`` where one of the input/output sources is a WebSocket URL, we can roughly split the process into 3 main phases. Phase 1: Handshake and Setup ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ First is the setup phase, where the EIR client connects to your WebSocket server and checks that it can communicate properly. .. image:: static/img/phase1_diagram.svg :alt: Phase 1 Diagram Below is an example of the connection logic for phase 1 that we can implement in our server. .. literalinclude:: ../doc_modules/user_guides/simulation_streamer.py :language: python :caption: simulation_streamer.py - Phase 1 Connection Logic :linenos: :start-after: # start-connect-websocket :end-before: # end-connect-websocket Phase 2: Data Setup ^^^^^^^^^^^^^^^^^^^ Here, the EIR client requests information about the data structure from the server. After that, EIR will request samples of data until the ``streaming_setup_samples`` under ``data_preparation`` in the global configuration file is reached. These samples are saved locally in the experiment directory during the experiment run phase, and should be deleted after the experiment is complete. The reason for this is twofold: 1. **Training Data Statistics** EIR uses these samples to gather and compute various statistics about the potentially raw training data to use for training. For example: - The mean and standard deviation of image pixel values for normalization. - The vocabulary of text data for tokenization. - The unique values in categorical data for encoding. - ... and so on. 2. **Validation Data Setup** EIR also uses these samples to set up the validation data. .. image:: static/img/phase2_diagram.svg :alt: Phase 2 Diagram Below is an example of the FastAPI websocket endpoint including the logic for handling phase 2 messages. .. literalinclude:: ../doc_modules/user_guides/simulation_streamer.py :language: python :caption: simulation_streamer.py - Phase 2 Data Setup Logic :linenos: :start-after: # start-websocket-endpoint :end-before: # end-websocket-endpoint Phase 3: Training Data Streaming ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Finally, we reach the training phase, where the EIR client requests data samples for training. This is really nothing too new after setting up the data structure in phase 2, now we just need to keep sending data samples to EIR for training. .. image:: static/img/phase3_diagram.svg :alt: Phase 3 Diagram Putting it All Together ----------------------- Now that we have the basic logic for each phase, let's put it all together for a toy example where we train a seq-to-seq model on a simple, simulated dataset. Setting up a WebSocket server ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ We will implement our streaming logic in the following files: - ``single_sample_simulation.py``: Contains the logic for generating a single sample of data, containing both the input and output sequences for the seq-to-seq model. - ``data_simulator.py``: Contains the logic for generating a batch of data samples, resetting the data stream, and keeping track of the current sample index. - ``simulation_streamer.py``: The main WebSocket server implementation, reads data from the simulator and handles the WebSocket connection with EIR. Here are the three files in their entirety: .. literalinclude:: ../doc_modules/user_guides/single_sample_simulation.py :language: python :caption: single_sample_simulation.py - Single Sample Generation Logic .. literalinclude:: ../doc_modules/user_guides/data_simulator.py :language: python :caption: data_simulator.py - Batch Data Generation Logic .. literalinclude:: ../doc_modules/user_guides/simulation_streamer.py :language: python :caption: simulation_streamer.py - WebSocket Server Implementation Assuming we have these files in the same directory, we can run the WebSocket server with ``python simulation_streamer.py`` or ``python -m simulation_streamer``. This will start the server on ``ws://localhost:8000/ws`` by default. Training with EIR ^^^^^^^^^^^^^^^^^ Here's the folder structure we'll be working with: .. literalinclude:: ../tutorials/tutorial_files/user_guides/01_streaming_data/commands/tutorial_folder.txt :language: console Let's look at our configurations. The global config specifies basic training parameters: .. literalinclude:: ../tutorials/tutorial_files/user_guides/01_streaming_data/globals.yaml :language: yaml :caption: globals.yaml For the input configuration, we are using the simulated input sequence in our data simulator. Notice how we are pointing to the WebSocket server as the input source: .. literalinclude:: ../tutorials/tutorial_files/user_guides/01_streaming_data/input.yaml :language: yaml :caption: input.yaml :emphasize-lines: 2 For fusion, we use a simple pass-through configuration since we're only doing sequence generation: .. literalinclude:: ../tutorials/tutorial_files/user_guides/01_streaming_data/fusion.yaml :language: yaml :caption: fusion.yaml Just like the input configuration, the output configuration specifies our WebSocket server as the output source: .. literalinclude:: ../tutorials/tutorial_files/user_guides/01_streaming_data/output.yaml :language: yaml :caption: output.yaml :emphasize-lines: 2 Note the ``output_source`` pointing to our WebSocket server. This tells EIR to expect streaming data from this address. As mentioned earlier, before starting training, we need to ensure our streaming server is running. Once it's running, in another terminal, we can start training: .. literalinclude:: ../tutorials/tutorial_files/user_guides/01_streaming_data/commands/STREAMING_SEQUENCE_GENERATION.txt :language: console During training, EIR will connect to the streaming server and receive data in batches. Let's look at some samples generated during training. At iteration 500: .. literalinclude:: ../tutorials/tutorial_files/user_guides/01_streaming_data/figures/auto_generated_iter_500.txt :language: console :caption: Auto-generated sequence at iteration 500 At iteration 2500: .. literalinclude:: ../tutorials/tutorial_files/user_guides/01_streaming_data/figures/auto_generated_iter_2500.txt :language: console :caption: Auto-generated sequence at iteration 2500 Here's the training curve showing our progress: .. image:: ../tutorials/tutorial_files/user_guides/01_streaming_data/figures/training_curve_LOSS.png :width: 100% :align: center The nice thing here is that once we have the major pieces in place for streaming data, it is easier to adapt this to a different logic within the current experiment (e.g. changing the data generation logic), or apply this to different datasets.