Skip to main content

How to stream

Prerequisites
This guide will assume familiarity with the following concepts:

Streaming is critical in making applications based on LLMs feel responsive to end-users.

Important LangChain primitives like LLMs, parsers, prompts, retrievers, and agents implement the LangChain Runnable Interface.

This interface provides two general approaches to stream content:

  • .stream(): a default implementation of streaming that streams the final output from the chain.
  • streamEvents() and streamLog(): these provide a way to stream both intermediate steps and final output from the chain.

Let’s take a look at both approaches!

Using Stream

All Runnable objects implement a method called stream.

These methods are designed to stream the final output in chunks, yielding each chunk as soon as it is available.

Streaming is only possible if all steps in the program know how to process an input stream; i.e., process an input chunk one at a time, and yield a corresponding output chunk.

The complexity of this processing can vary, from straightforward tasks like emitting tokens produced by an LLM, to more challenging ones like streaming parts of JSON results before the entire JSON is complete.

The best place to start exploring streaming is with the single most important components in LLM apps – the models themselves!

LLMs and Chat Models

Large language models can take several seconds to generate a complete response to a query. This is far slower than the ~200-300 ms threshold at which an application feels responsive to an end user.

The key strategy to make the application feel more responsive is to show intermediate progress; e.g., to stream the output from the model token by token.

import "dotenv/config";

Pick your chat model:

Install dependencies

yarn add @langchain/openai 

Add environment variables

OPENAI_API_KEY=your-api-key

Instantiate the model

import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
model: "gpt-3.5-turbo-0125",
temperature: 0
});
const stream = await model.stream("Hello! Tell me about yourself.");
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
console.log(`${chunk.content}|`);
}
|
Hello|
!|
I|
'm|
an|
AI|
language|
model|
developed|
by|
Open|
AI|
.|
I|
'm|
designed|
to|
assist|
with|
a|
wide|
range|
of|
tasks|
and|
topics|
,|
from|
answering|
questions|
and|
engaging|
in|
conversations|
,|
to|
helping|
with|
writing|
and|
providing|
information|
on|
various|
subjects|
.|
I|
don|
't|
have|
personal|
experiences|
or|
emotions|
,|
as|
I|
'm|
just|
a|
computer|
program|
,|
but|
I|
'm|
here|
to|
help|
and|
provide|
information|
to|
the|
best|
of|
my|
abilities|
.|
Is|
there|
something|
specific|
you|
'd|
like|
to|
know|
or|
discuss|
?|
|

Let’s have a look at one of the raw chunks:

chunks[0];
AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "",
name: undefined,
additional_kwargs: {}
}

We got back something called an AIMessageChunk. This chunk represents a part of an AIMessage.

Message chunks are additive by design – one can simply add them up using the .concat() method to get the state of the response so far!

let finalChunk = chunks[0];

for (const chunk of chunks.slice(1, 5)) {
finalChunk = finalChunk.concat(chunk);
}

finalChunk;
AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "Hello! I'm", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "Hello! I'm",
name: undefined,
additional_kwargs: {}
}

Chains

Virtually all LLM applications involve more steps than just a call to a language model.

Let’s build a simple chain using LangChain Expression Language (LCEL) that combines a prompt, model and a parser and verify that streaming works.

We will use StringOutputParser to parse the output from the model. This is a simple parser that extracts the content field from an AIMessageChunk, giving us the token returned by the model.

tip

LCEL is a declarative way to specify a “program” by chainining together different LangChain primitives. Chains created using LCEL benefit from an automatic implementation of stream, allowing streaming of the final output. In fact, chains created with LCEL implement the entire standard Runnable interface.

import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatPromptTemplate } from "@langchain/core/prompts";

const prompt = ChatPromptTemplate.fromTemplate("Tell me a joke about {topic}");

const parser = new StringOutputParser();

const chain = prompt.pipe(model).pipe(parser);

const stream = await chain.stream({
topic: "parrot",
});

for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
Sure|
!|
Here|
's|
a|
par|
rot|
-themed|
joke|
for|
you|
:

|
Why|
did|
the|
par|
rot|
bring|
a|
ladder|
to|
the|
party|
?

|
Because|
it|
wanted|
to|
be|
a|
high|
f|
lier|
!|
|
note

You do not have to use the LangChain Expression Language to use LangChain and can instead rely on a standard imperative programming approach by caling invoke, batch or stream on each component individually, assigning the results to variables and then using them downstream as you see fit.

If that works for your needs, then that’s fine by us 👌!

Working with Input Streams

What if you wanted to stream JSON from the output as it was being generated?

If you were to rely on JSON.parse to parse the partial json, the parsing would fail as the partial json wouldn’t be valid json.

You’d likely be at a complete loss of what to do and claim that it wasn’t possible to stream JSON.

Well, turns out there is a way to do it - the parser needs to operate on the input stream, and attempt to “auto-complete” the partial json into a valid state.

Let’s see such a parser in action to understand what this means.

import { JsonOutputParser } from "@langchain/core/output_parsers";

const chain = model.pipe(new JsonOutputParser());
const stream = await chain.stream(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
console.log(chunk);
}
{ countries: [] }
{ countries: [ { name: "" } ] }
{ countries: [ { name: "France" } ] }
{ countries: [ { name: "France", population: "" } ] }
{ countries: [ { name: "France", population: "66" } ] }
{ countries: [ { name: "France", population: "66," } ] }
{ countries: [ { name: "France", population: "66,960" } ] }
{ countries: [ { name: "France", population: "66,960," } ] }
{ countries: [ { name: "France", population: "66,960,000" } ] }
{
countries: [ { name: "France", population: "66,960,000" }, { name: "" } ]
}
{
countries: [ { name: "France", population: "66,960,000" }, { name: "Spain" } ]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "" }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46" }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46," }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660" }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660," }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660,000" }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660,000" },
{ name: "" }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660,000" },
{ name: "Japan" }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660,000" },
{ name: "Japan", population: "" }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660,000" },
{ name: "Japan", population: "126" }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660,000" },
{ name: "Japan", population: "126," }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660,000" },
{ name: "Japan", population: "126,500" }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660,000" },
{ name: "Japan", population: "126,500," }
]
}
{
countries: [
{ name: "France", population: "66,960,000" },
{ name: "Spain", population: "46,660,000" },
{ name: "Japan", population: "126,500,000" }
]
}

Now, let’s break streaming. We’ll use the previous example and append an extraction function at the end that extracts the country names from the finalized JSON. Since this new last step is just a function call with no defined streaming behavior, the streaming output from previous steps is aggregated, then passed as a single input to the function.

danger

Any steps in the chain that operate on finalized inputs rather than on input streams can break streaming functionality via stream.

tip

Later, we will discuss the streamEvents API which streams results from intermediate steps. This API will stream results from intermediate steps even if the chain contains steps that only operate on finalized inputs.

// A function that operates on finalized inputs
// rather than on an input_stream

// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};

const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);

const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]

Non-streaming components

Like the above example, some built-in components like Retrievers do not offer any streaming. What happens if we try to stream them?

import { OpenAIEmbeddings } from "@langchain/openai";
import { MemoryVectorStore } from "langchain/vectorstores/memory";
import { ChatPromptTemplate } from "@langchain/core/prompts";

const template = `Answer the question based only on the following context:
{context}

Question: {question}
`;
const prompt = ChatPromptTemplate.fromTemplate(template);

const vectorstore = await MemoryVectorStore.fromTexts(
["mitochondria is the powerhouse of the cell", "buildings are made of brick"],
[{}, {}],
new OpenAIEmbeddings()
);

const retriever = vectorstore.asRetriever();

const chunks = [];

for await (const chunk of await retriever.stream(
"What is the powerhouse of the cell?"
)) {
chunks.push(chunk);
}

console.log(chunks);
[
[
Document {
pageContent: "mitochondria is the powerhouse of the cell",
metadata: {}
},
Document {
pageContent: "buildings are made of brick",
metadata: {}
}
]
]

Stream just yielded the final result from that component.

This is OK! Not all components have to implement streaming – in some cases streaming is either unnecessary, difficult or just doesn’t make sense.

tip

An LCEL chain constructed using some non-streaming components will still be able to stream in a lot of cases, with streaming of partial output starting after the last non-streaming step in the chain.

Here’s an example of this:

import {
RunnablePassthrough,
RunnableSequence,
} from "@langchain/core/runnables";
import type { Document } from "@langchain/core/documents";
import { StringOutputParser } from "@langchain/core/output_parsers";

const formatDocs = (docs: Document[]) => {
return docs.map((doc) => doc.pageContent).join("\n-----\n");
};

const retrievalChain = RunnableSequence.from([
{
context: retriever.pipe(formatDocs),
question: new RunnablePassthrough(),
},
prompt,
model,
new StringOutputParser(),
]);
const stream = await retrievalChain.stream(
"What is the powerhouse of the cell?"
);

for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
The|
powerhouse|
of|
the|
cell|
is|
the|
mitochond|
ria|
.|
|

Now that we’ve seen how the stream method works, let’s venture into the world of streaming events!

Using Stream Events

Event Streaming is a beta API. This API may change a bit based on feedback.

note

Introduced in @langchain/core 0.1.27.

For the streamEvents method to work properly:

  • Any custom functions / runnables must propragate callbacks
  • Set proper parameters on models to force the LLM to stream tokens.
  • Let us know if anything doesn’t work as expected!

Event Reference

Below is a reference table that shows some events that might be emitted by the various Runnable objects.

note

When streaming is implemented properly, the inputs to a runnable will not be known until after the input stream has been entirely consumed. This means that inputs will often be included only for end events and rather than for start events.

eventnamechunkinputoutput
on_llm_start[model name]{‘input’: ‘hello’}
on_llm_stream[model name]‘Hello’ or AIMessageChunk(content=“hello”)
on_llm_end[model name]‘Hello human!’{“generations”: [], “llmOutput”: None, …}
on_chain_startformat_docs
on_chain_streamformat_docs“hello world!, goodbye world!”
on_chain_endformat_docs[Document(…)]“hello world!, goodbye world!”
on_tool_startsome_tool{“x”: 1, “y”: “2”}
on_tool_streamsome_tool{“x”: 1, “y”: “2”}
on_tool_endsome_tool{“x”: 1, “y”: “2”}
on_retriever_start[retriever name]{“query”: “hello”}
on_retriever_chunk[retriever name]{documents: []}
on_retriever_end[retriever name]{“query”: “hello”}{documents: []}
on_prompt_start[template_name]{“question”: “hello”}
on_prompt_end[template_name]{“question”: “hello”}ChatPromptValue(messages: [SystemMessage, …])

Chat Model

Let’s start off by looking at the events produced by a chat model.

const events = [];

const eventStream = await model.streamEvents("hello", { version: "v1" });

for await (const event of eventStream) {
events.push(event);
}
13
note

Hey what’s that funny version=“v1” parameter in the API?! 😾

This is a beta API, and we’re almost certainly going to make some changes to it.

This version parameter will allow us to mimimize such breaking changes to your code.

In short, we are annoying you now, so we don’t have to annoy you later.

Let’s take a look at the few of the start event and a few of the end events.

events.slice(0, 3);
[
{
run_id: "ce08e556-e8e7-4bfb-b8c0-e51926fc9c0c",
event: "on_llm_start",
name: "ChatOpenAI",
tags: [],
metadata: {},
data: { input: "hello" }
},
{
event: "on_llm_stream",
run_id: "ce08e556-e8e7-4bfb-b8c0-e51926fc9c0c",
tags: [],
metadata: {},
name: "ChatOpenAI",
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "",
name: undefined,
additional_kwargs: {}
}
}
},
{
event: "on_llm_stream",
run_id: "ce08e556-e8e7-4bfb-b8c0-e51926fc9c0c",
tags: [],
metadata: {},
name: "ChatOpenAI",
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "Hello", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "Hello",
name: undefined,
additional_kwargs: {}
}
}
}
]
events.slice(-2);
[
{
event: "on_llm_stream",
run_id: "ce08e556-e8e7-4bfb-b8c0-e51926fc9c0c",
tags: [],
metadata: {},
name: "ChatOpenAI",
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "",
name: undefined,
additional_kwargs: {}
}
}
},
{
event: "on_llm_end",
name: "ChatOpenAI",
run_id: "ce08e556-e8e7-4bfb-b8c0-e51926fc9c0c",
tags: [],
metadata: {},
data: { output: { generations: [ [Array] ] } }
}
]

Chain

Let’s revisit the example chain that parsed streaming JSON to explore the streaming events API.

const chain = model.pipe(new JsonOutputParser());
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" }
);

const events = [];
for await (const event of eventStream) {
events.push(event);
}
117

If you examine at the first few events, you’ll notice that there are 3 different start events rather than 2 start events.

The three start events correspond to:

  1. The chain (model + parser)
  2. The model
  3. The parser
events.slice(0, 3);
[
{
run_id: "c486d08d-b426-43c3-8fe0-a943db575133",
event: "on_chain_start",
name: "RunnableSequence",
tags: [],
metadata: {},
data: {
input: "Output a list of the countries france, spain and japan and their populations in JSON format. Use a d"... 129 more characters
}
},
{
event: "on_llm_start",
name: "ChatOpenAI",
run_id: "220e2e35-06d1-4db7-87a4-9c35643eee13",
tags: [ "seq:step:1" ],
metadata: {},
data: { input: { messages: [ [Array] ] } }
},
{
event: "on_parser_start",
name: "JsonOutputParser",
run_id: "34a7abe4-98ae-46ad-85ac-625e724468b1",
tags: [ "seq:step:2" ],
metadata: {},
data: {}
}
]

What do you think you’d see if you looked at the last 3 events? what about the middle?

Let’s use this API to take output the stream events from the model and the parser. We’re ignoring start events, end events and events from the chain.

let eventCount = 0;

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" }
);

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_llm_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
}
eventCount += 1;
}
Chat model chunk:
Chat model chunk: {"
Chat model chunk: countries
Chat model chunk: ":
Parser chunk: {"countries":[]}
Chat model chunk: [

Chat model chunk:
Chat model chunk: {"
Chat model chunk: name
Chat model chunk: ":
Parser chunk: {"countries":[{"name":""}]}
Chat model chunk: "
Parser chunk: {"countries":[{"name":"fr"}]}
Chat model chunk: fr
Parser chunk: {"countries":[{"name":"france"}]}
Chat model chunk: ance
Chat model chunk: ",
Chat model chunk: "
Chat model chunk: population
Chat model chunk: ":
Parser chunk: {"countries":[{"name":"france","population":""}]}
Chat model chunk: "
Parser chunk: {"countries":[{"name":"france","population":"67"}]}

Because both the model and the parser support streaming, we see streaming events from both components in real time! Neat! 🦜

Filtering Events

Because this API produces so many events, it is useful to be able to filter on events.

You can filter by either component name, component tags or component type.

By Name

const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" },
{ includeNames: ["my_parser"] }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: "on_parser_start",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: {}
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: { chunk: {} }
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: { chunk: { countries: [] } }
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: { chunk: { countries: [ {} ] } }
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: {
chunk: { countries: [ { name: "" } ] }
}
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: {
chunk: { countries: [ { name: "France" } ] }
}
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: {
chunk: { countries: [ { name: "France", population: "" } ] }
}
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: {
chunk: { countries: [ { name: "France", population: "67" } ] }
}
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: {
chunk: { countries: [ { name: "France", population: "67," } ] }
}
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: {
chunk: { countries: [ { name: "France", population: "67,081" } ] }
}
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "c889ec6f-6050-40c2-8fdb-c24ab88606c3",
tags: [ "seq:step:2" ],
metadata: {},
data: {
chunk: { countries: [ { name: "France", population: "67,081," } ] }
}
}

By type

const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" },
{ includeTypes: ["llm"] }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: "on_llm_start",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
input: { messages: [ [ [HumanMessage] ] ] }
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "{\n",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "{\n", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "{\n",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " ",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: " ", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: " ",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: ' "',
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: ' "', additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: ' "',
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "countries",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "countries", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "countries",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: '":',
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: '":', additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: '":',
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " [\n",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: " [\n", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: " [\n",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " ",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: " ", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: " ",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " {\n",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: " {\n", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: " {\n",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "0c525b62-0d00-461c-9d1e-1bd8b339e711",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " ",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: " ", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: " ",
name: undefined,
additional_kwargs: {}
}
}
}
}

By Tags

caution

Tags are inherited by child components of a given runnable.

If you’re using tags to filter, make sure that this is what you want.

const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" },
{ includeTags: ["my_chain"] }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
run_id: "e7abe3de-2402-49f1-a9d7-622f6aa2f5b9",
event: "on_chain_start",
name: "RunnableSequence",
tags: [ "my_chain" ],
metadata: {},
data: {
input: "Output a list of the countries france, spain and japan and their populations in JSON format. Use a d"... 129 more characters
}
}
{
event: "on_llm_start",
name: "ChatOpenAI",
run_id: "4bc4598c-3bf9-44d2-9c30-f9c635875b31",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
input: { messages: [ [ [HumanMessage] ] ] }
}
}
{
event: "on_parser_start",
name: "my_parser",
run_id: "df3b3f2b-8b67-4eeb-9376-a21799475e8f",
tags: [ "seq:step:2", "my_chain" ],
metadata: {},
data: {}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "4bc4598c-3bf9-44d2-9c30-f9c635875b31",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "df3b3f2b-8b67-4eeb-9376-a21799475e8f",
tags: [ "seq:step:2", "my_chain" ],
metadata: {},
data: { chunk: {} }
}
{
event: "on_chain_stream",
run_id: "e7abe3de-2402-49f1-a9d7-622f6aa2f5b9",
tags: [ "my_chain" ],
metadata: {},
name: "RunnableSequence",
data: { chunk: {} }
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "4bc4598c-3bf9-44d2-9c30-f9c635875b31",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "{\n",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "{\n", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "{\n",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "4bc4598c-3bf9-44d2-9c30-f9c635875b31",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " ",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: " ", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: " ",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "4bc4598c-3bf9-44d2-9c30-f9c635875b31",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: ' "',
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: ' "', additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: ' "',
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "4bc4598c-3bf9-44d2-9c30-f9c635875b31",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "countries",
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: "countries", additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: "countries",
name: undefined,
additional_kwargs: {}
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "4bc4598c-3bf9-44d2-9c30-f9c635875b31",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: '":',
generationInfo: { prompt: 0, completion: 0 },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: { content: '":', additional_kwargs: {} },
lc_namespace: [ "langchain_core", "messages" ],
content: '":',
name: undefined,
additional_kwargs: {}
}
}
}
}

Non-streaming components

Remember how some components don’t stream well because they don’t operate on input streams?

While such components can break streaming of the final output when using stream, streamEvents will still yield streaming events from intermediate steps that support streaming!

// A function that operates on finalized inputs
// rather than on an input_stream

// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};

const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);

const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]

As expected, the stream API doesn’t work correctly because extractCountryNames doesn’t operate on streams.

Now, let’s confirm that with streamEvents we’re still seeing streaming output from the model and the parser.

const eventStream = await chain.streamEvents(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_llm_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
}
eventCount += 1;
}
Chat model chunk:
Parser chunk: {}
Chat model chunk: {

Chat model chunk:
Chat model chunk: "
Chat model chunk: countries
Chat model chunk: ":
Parser chunk: {"countries":[]}
Chat model chunk: [

Chat model chunk:
Parser chunk: {"countries":[{}]}
Chat model chunk: {

Chat model chunk:
Chat model chunk: "
Chat model chunk: name
Chat model chunk: ":
Parser chunk: {"countries":[{"name":""}]}
Chat model chunk: "
Parser chunk: {"countries":[{"name":"France"}]}
Chat model chunk: France
Chat model chunk: ",

Chat model chunk:
Chat model chunk: "
Chat model chunk: population
Chat model chunk: ":
Parser chunk: {"countries":[{"name":"France","population":""}]}
Chat model chunk: "

Was this page helpful?


You can leave detailed feedback on GitHub.