$ gnpm install restream
Restream: Regular expression detection implemented as a Transform
steam;
and
Replaceable: Regex-based replacement stream to update incoming data on-the-fly (possibly with async functions). Transforms data from a stream using a set of regular expressions. Allows to build complex pipelines for transforming string using cut-and-paste rules to prevent certain rules to work on undesired piece of input; including
SyncReplaceable: The synchronous version of the Replaceable that that is just a function and not a stream. Returns the result immediately and is deterministic;
SerialAsyncReplaceable: When rules use asynchronous replacements, the serial-async
instance provides a way to run replacements detected with global regular expression one by one rather than in parallel.
yarn add restream
npm i restream
restream(regex: !RegExp): !stream.Transform
Replaceable
Class
constructor(rules: !(Rule|Array<!Rule>), options=: !stream.TransformOptions)
brake(): void
async replace(data: !(string|Buffer|stream.Stream), context=: !Object<string, *>): !Promise<string>
SerialAsyncReplaceable
Class
restream(regex: !RegExp): !stream.Transform
The package contains the default restream
function and a family of Replaceable
classes, as well as functions to create markers and their cut and paste rules. The replace
function can be used to end a replaceable instance with some data to transform it.
import restream, {
Replaceable, SyncReplaceable, SerialAsyncReplaceable,
makeMarkers, makeCutRule, makePasteRule,
replace,
} from 'restream'
The types and externs for Google Closure Compiler via Depack are defined in the _restream
namespace.
restream(
regex: !RegExp,
): !stream.Transform
Create a Transform stream which will maintain a buffer with data received from a Readable stream and write data when the buffer can be matched against the regex. It will push the whole match object (or objects when the g flag is used) returned by /regex/.exec(buffer)
.
!RegExp
: The regular expression to execute.The Transform
stream will buffer incoming data and push regex results when matches can be made, i.e. when regex.exec
returns non-null value. When the g
flag is added to the regex, multiple matches will be detected.
import restream from 'restream'
(async () => {
try {
const rs = createReadable('test-string-{12345}-{67890}')
const stream = restream(/{(\d+)}/g) // create a transform stream
rs.pipe(stream)
const { data, ws } = createWritable()
stream.pipe(ws)
ws.once('finish', () => {
console.log(data)
})
} catch (err) {
console.error(err)
}
})()
[
[
'{12345}',
'12345',
index: 12,
input: 'test-string-{12345}-{67890}',
groups: undefined
],
[
'{67890}',
'67890',
index: 20,
input: 'test-string-{12345}-{67890}',
groups: undefined
]
]
Replaceable
ClassA Replaceable transform stream can be used to transform data according to a single or multiple rules.
Replaceable
extends stream.Transform
: An interface for the context accessible via this in replacer functions.
Name | Type | Description |
---|---|---|
constructor | new (rules: !(Rule | Array<!Rule>), options?: !stream.TransformOptions) => Replaceable | Constructor method. |
brake | () => void | After calling this method, any of the following rules and matches within the same rule won't be able to make any more changes. |
replace | (data: !(string | Buffer | stream.Stream), context?: !Object<string, *>) => !Promise<string> | Creates a new replaceable to replace the given string, buffer or stream using the rules of the current stream. Calling brake will also set _broke on the parent stream. The new Replaceable will copy the rules, and be assigned the context to it before replacing data. The this won't be shared by parent and child rules, but the context will be updated: const context = { test: this.test }; content = await this.replace(content, context); this.test = context.test . |
Rule
TypeReplaceable uses rules to determine how to transform data. Below is the description of the Rule
type.
Property | Type | Description | Example |
---|---|---|---|
re* |
RegExp | A regular expression. | Detect inline code blocks in markdown: /`(.+?)`/ . |
replacement* |
string | function | async function | A replacer either as a string, function, or async function. It will be passed to the string.replace(re, replacement) native JavaScript method. |
As a string: INLINE_CODE . |
String
ReplacementReplacement as a string. Given a simple string, it will replace a match detected by the rule's regular expression, without consideration for the capturing groups.
Function
ReplacerReplacement as a function. See MDN for more documentation on how the replacer function should be implemented.
The example below allows to replace strings like %NPM: documentary%
and %NPM: @rqt/aqt%
into a markdown badge (used in documentary
).
|
Async Function
ReplacerAn asynchronous function to get replacements. The stream won't push any data until the replacer's promise is resolved. Due to implementation details, the regex will have to be run against incoming chunks twice, therefore it might be not ideal for heavy-load applications with many matches.
This example will replace strings like %FORK-js: example example/Replaceable.js%
into the output of a forked JavaScript program (used in documentary
).
|
constructor(
rules: !(Rule|Array<!Rule>),
options=: !stream.TransformOptions,
): Replaceable
Constructor method.
!(Rule | Array<!Rule>)
: An array with rules, or a single rule.!stream.TransformOptions
(optional): Options for the transform stream.Create a Transform stream which will make data available when an incoming chunk has been updated according to the specified rule or rules. The second argument will be passed as options to the Transform constructor if specified.
Matches can be replaced using a string, function or async function. When multiple rules are passed as an array, the string will be replaced multiple times if the latter rules also modify the data.
import { Replaceable } from 'restream'
const dateRule = {
re: /%DATE%/g,
replacement: new Date().toLocaleString(),
}
const emRule = {
re: /__(.+?)__/g,
replacement(match, p1) {
return `<em>${p1}</em>`
},
}
const authorRule = {
re: /^%AUTHOR_ID: (.+?)%$/mg,
async replacement(match, id) {
const name = await new Promise(resolve => {
// pretend to lookup author name from the database
const authors = { 5: 'John' }
resolve(authors[id])
})
return `Author: <strong>${name}</strong>`
},
}
const STRING = `
Hello __Fred__, your username is __fred__.
You have __5__ stars.
%AUTHOR_ID: 5%
on __%DATE%__
`
const replaceable = new Replaceable([
dateRule,
emRule,
authorRule,
])
const rs = createReadable(STRING)
rs
.pipe(replaceable)
.pipe(process.stdout)
Output:
Hello <em>Fred</em>, your username is <em>fred</em>.
You have <em>5</em> stars.
Author: <strong>John</strong>
on <em>5/2/2020, 19:55:12</em>
Replacer
ContextReplacer functions will be executed with their context set to the Replaceable instance to which they belong. Both sync
and async
replacers can use the this
keyword to access their Replaceable instance and modify its properties and/or emit events. This is done so that there's a mechanism by which replacers can share data between themselves.
For example, we might want to read and parse an external file first, but remember its data for use in following replacers.
Given an external file example/types.json
:
{
"TypeA": "A new type with certain properties.",
"TypeB": "A type to represent the state of the world."
}
Replaceable can read it in the first typesRule
rule, and reference its data in the second paramRule
rule:
/** yarn example/context.js */
import { collect } from 'catchment'
import { createReadStream } from 'fs'
import { Replaceable } from 'restream'
import { createReadable } from './lib'
const typesRule = {
re: /^%types: (.+?)%$/mg,
async replacement(match, location) {
const rs = createReadStream(location)
const d = await collect(rs)
const j = JSON.parse(d)
this.types = j // remember types for access in following rules
return match
},
}
const paramRule = {
re: /^ \* @typedef {(.+?)} (.+)(?: .*)?/mg,
replacement(match, type, typeName) {
const description = this.types[typeName]
if (!description) return match
return ` * @typedef {${type}} ${typeName} ${description}`
},
}
const STRING = `
%types: example/types.json%
/**
* @typedef {Object} TypeA
*/
`
const replaceable = new Replaceable([
typesRule,
paramRule,
])
const rs = createReadable(STRING)
rs
.pipe(replaceable)
.pipe(process.stdout)
%types: example/types.json%
/**
* @typedef {Object} TypeA A new type with certain properties.
*/
As can be seen above, the description of the type was automatically updated based on the data read from the file.
All methods on the Replaceable instance can be accessed via this
.
brake(): void
After calling this method, any of the following rules and matches within the same rule won't be able to make any more changes.
The brake
method allows to stop further rules from processing incoming chunks. If a replacer function is run with a global regex, the succeeding replacements will also have no effect.
import { Replaceable } from 'restream'
(async () => {
const replaceable = new Replaceable([
{
re: /AAA/g,
replacement() {
this.brake() // prevent further replacements
return 'BBB'
},
},
{
re: /AAA/g,
replacement() {
return 'RRR'
},
},
])
replaceable.pipe(process.stdout)
replaceable.end('AAA AAA AAA AAA')
})()
BBB AAA AAA AAA
async replace(
data: !(string|Buffer|stream.Stream),
context=: !Object<string, *>,
): !Promise<string>
Creates a new replaceable to replace the given string, buffer or stream using the rules of the current stream. Calling brake
will also set _broke
on the parent stream. The new Replaceable will copy the rules, and be assigned the context to it before replacing data. The this
won't be shared by parent and child rules, but the context will be updated: const context = { test: this.test }; content = await this.replace(content, context); this.test = context.test
.
!(string | Buffer | stream.Stream)
: The input data to replace via forked Replaceable.!Object<string, *>
(optional): The context to assign to the new Replaceable.The rules can recursively spawn new instances of the Replaceable instance without having to implement them manually. For example, we might detect a match where the content potentially has other matches, but the regex only works on the outer one. In such cases, the async replace
method can be used.
import { Replaceable } from 'restream'
const replaceable = new Replaceable({
re: /<(.+?)>([\s\S]+)<\/\1>/gm,
async replacement(m, tag, content) {
content = await this.replace(content)
return `<${tag}-replaced>${content}</${tag}-replaced>`
},
})
const html = `<div>
<span>Hello World</span>
</div>`
const naive = html.replace(/<(.+?)>([\s\S]+)<\/\1>/gm, (m, tag, content) => {
console.log('Plain regexp detected tag <%s>', tag)
// even if the actual match is returned, the inner tag won't be detected
return `<${tag}-replaced>${content}</${tag}-replaced>`
})
console.log('Only the outer match is detected: %s\n---', naive)
;(async () => {
const res = await Replaceable.replace(replaceable, html)
console.log('replaceable.replace finds matches in children:', res)
})()
Plain regexp detected tag <div>
Only the outer match is detected: <div-replaced>
<span>Hello World</span>
</div-replaced>
---
replaceable.replace finds matches in children: <div-replaced>
<span-replaced>Hello World</span-replaced>
</div-replaced>
It supports passing of the context
argument because the child rules don't inherit the this
property (this might change in the next version). However, since the replace
method is async, the properties access to which is shared by rules (either siblings, or children/parents) must be accessed via an object, because otherwise it's going to be the values of parallel lane contexts that get modified and not the overall context (as shown by the last detection on the example below).
import { Replaceable } from 'restream'
const replaceable = new Replaceable({
re: /<(.+?)>([\s\S]+)<\/\1>/gm,
async replacement(m, tag, content) {
console.log('Total found: %s, replacer lane: %s [%s]',
this.context.found, this.lane, tag)
if (this.context.found > 2) {
this.brake()
return m
}
this.context.found++
this.lane++
content = await this.replace(content, {
context: this.context,
lane: this.lane,
})
return `<${tag}-replaced>${content}</${tag}-replaced>`
},
})
const html = `<div>
<details>
<summary>Restream</summary>
2019
</details>
<span>Hello World</span>
<address>London</address>
<em>Art Deco</em>
</div>`
;(async () => {
replaceable.context = { found: 0 }
replaceable.lane = 0
const res = await Replaceable.replace(replaceable, html)
console.log()
console.log(res)
})()
Total found: 0, replacer lane: 0 [div]
Total found: 1, replacer lane: 1 [details]
Total found: 2, replacer lane: 2 [span]
Total found: 3, replacer lane: 3 [address]
Total found: 3, replacer lane: 2 [summary]
<div-replaced>
<details-replaced>
<summary>Restream</summary>
2019
</details-replaced>
<span-replaced>Hello World</span-replaced>
<address>London</address>
<em>Art Deco</em>
</div-replaced>
Replacer
ErrorsIf an error happens in a sync
or async
replacer function, the Replaceable
will emit it and close.
/** yarn example/errors.js */
import { Replaceable } from 'restream'
import { createReadable } from './lib'
const replace = () => {
throw new Error('An error occurred during a replacement.')
}
(async () => {
const rs = createReadable('example-string')
const replaceable = new Replaceable([
{
re: /.*/,
replacement(match) {
return replace(match)
},
},
])
rs
.pipe(replaceable)
.on('error', (error) => {
console.log(error)
})
})()
Error: An error occurred during a replacement.
at replace (/Users/anton/artdeco/restream/example/errors.js:6:9)
at Replaceable.replacement (/Users/anton/artdeco/restream/example/errors.js:16:16)
static replace
The static .replace
method allows to feed data into the stream and wait until it finishes execution. This works for strings, buffers and streams.
import { Replaceable } from 'restream'
import { Readable } from 'stream'
const example = {
get replaceable() {
const r = new Replaceable({
re: /hello/,
replacement: 'hi',
})
return r
},
}
;(async () => {
const string = await Replaceable.replace(
example.replaceable, 'hello string world')
console.log(string)
const buffer = await Replaceable.replace(
example.replaceable, new Buffer('hello buffer world'))
console.log(buffer)
const stream = await Replaceable.replace(
example.replaceable, new Readable({
read() {
this.push('hello stream world')
this.push(null)
},
}))
console.log(stream)
})()
hi string world
hi buffer world
hi stream world
Since Replaceable supports static
.replace
, this is not particularly relevant, however can help in certain scenarios.
To be able to collect stream data into memory, the catchment
package can be used. It will create a promise resolved when the stream finishes.
import { Replaceable } from 'restream'
import Catchment, { collect } from 'catchment'
import { equal } from 'assert'
//0. SETUP: create a replaceable and readable input streams,
// and pipe the input stream into the replaceable.
const replaceable = new Replaceable([
{
re: /hello/i,
replacement() {
return 'WORLD'
},
},
{
re: /world/,
replacement() {
return 'hello'
},
},
])
const rs = createReadable('HELLO world')
rs
.pipe(replaceable)
// 1. Create a writable catchment using constructor.
const catchment = new Catchment()
replaceable.pipe(catchment)
// OR 1. Create a writable catchment and automatically
// pipe into it.
const { promise } = new Catchment({
rs: replaceable,
})
// OR 1+2. Use the collect method which uses a catchment
// internally.
const data = await collect(replaceable)
// 2. WAIT for the catchment streams to finish.
const data2 = await catchment.promise
const data3 = await promise
// Validate that results are the same.
equal(data, data2); equal(data2, data3)
console.log(data)
WORLD hello
SerialAsyncReplaceable
ClassSerialAsyncReplaceable
extends Replaceable
: A class for when serial execution of asynchronous replacements within the same rule are needed.
Name | Type | Description |
---|---|---|
constructor | new () => SerialAsyncReplaceable | Constructor method. |
addItem | () => !Promise<string> | <callback async return="string" name="link"> An async replacer function to be executed when all previous links in the chain have resolved. </callback> |
The SerialAsyncReplaceable can be used whenever there are multiple detections by the same rule that need to be run asynchronously one after another rather than in parallel. This can be achieved by calling this.addItem(...)
method on the class and awaiting on the returned promise. Behind the scenes, each replacement will await on the collective promise from previous replacements.
let s = new Date().getTime()
const replaceable = new SerialAsyncReplaceable([
// 1. Use the `this.addItem` method to set up the await chain.
{
re: /---/g,
async replacement() {
const res = await this.addItem(async () => {
await new Promise(r => setTimeout(r, 100))
const d = new Date().getTime()
const delta = d - s
return delta
})
return res
},
},
// 2. All async replacement without `this.addItem` will run in parallel.
{
re: /___/g,
async replacement() {
await new Promise(r => setTimeout(r, 100))
const d = new Date().getTime()
const delta = d - s
return delta
},
},
])
replaceable
.pipe(process.stdout)
replaceable.end(input)
Test: serial 155ms, parallel 467ms,
Example: serial 256ms, parallel 467ms,
Total: serial 362ms, parallel 467ms,
restream(
regex: !RegExp,
): !stream.Transform
Create a Transform stream which will maintain a buffer with data received from a Readable stream and write data when the buffer can be matched against the regex. It will push the whole match object (or objects when the g flag is used) returned by /regex/.exec(buffer)
.
!RegExp
: The regular expression to execute.The SyncReplaceable can be used when data is already stored on memory (for example, if you're running an Azure function with Node.JS and it doesn't support streaming), and needs to be transformed using the synchronous flow. This implies that the rules cannot contain asynchronous replacers.
/** yarn e example/sync.js */
import { SyncReplaceable } from 'replaceable'
const n = ['zero', 'one', 'two', 'three', 'four',
'five', 'six', 'seven', 'eight', 'nine']
const input = `Test String: {12345}
Example Test: {67890}`
const res = SyncReplaceable(input, [
// The rule to map numbers into their names.
{
re: /{(\d+)}/g,
replacement(match, num) {
return num.split('').map((nn) => {
return n[nn]
}).join(', ')
},
},
// The rule to end every line with a dot.
{
re: /^[\s\S]*$/,
replacement(match) {
return match
.split('\n')
.map(a => `${a}.`)
.join('\n')
},
},
])
Test String: one, two, three, four, five.
Example Test: six, seven, eight, nine, zero.
Markers can be used to cut some portion of input text according to a regular expression, run necessary replacement rules on the remaining parts, and then restore the cut chunks. In this way, those chunks do not take part in transformations produced by rules, and can be re-inserted into the stream in their original form.
An example use case would be a situation when markdown code blocks need to be transformed into html, however those code blocks don't need to be processed when inside of a comment, such as:
<!--
The following line should be preserved:
**Integrity is the ability to stand by an idea.**
-->
But the next lines should be transformed into HTML:
**Civilization is the process of setting man free from men.**
**Every building is like a person. Single and unrepeatable.**
When using a naïve transformation with a replacement rule for changing **
into <strong>
, both lines will be transformed.
import { Replaceable } from 'restream'
import { createReadStream } from 'fs'
const FILE = 'example/markers/example.md'
const strongRule = {
re: /\*\*(.+?)\*\*/g,
replacement(match, p1) {
return `<strong>${p1}</strong>`
},
}
;(async () => {
const rs = createReadStream(FILE)
const replaceable = new Replaceable(strongRule)
rs
.pipe(replaceable)
.pipe(process.stdout)
})()
<!--
The following line should be preserved:
<strong>Integrity is the ability to stand by an idea.</strong>
-->
But the next lines should be transformed into HTML:
<strong>Civilization is the process of setting man free from men.</strong>
<strong>Every building is like a person. Single and unrepeatable.</strong>
In the output above, the **
in the comment is also transformed using the rule. To prevent this, the strategy is to cut comments out first using markers, then perform the transformation using the strong
rule, and finally place the comments back into the text.
const { comments } = makeMarkers({
comments: /<!--([\s\S]+?)-->/g,
})
const cutComments = makeCutRule(comments)
const pasteComments = makePasteRule(comments)
const replaceable = new Replaceable([
cutComments,
strongRule,
pasteComments,
])
<!--
The following line should be preserved:
**Integrity is the ability to stand by an idea.**
-->
But the next lines should be transformed into HTML:
<strong>Civilization is the process of setting man free from men.</strong>
<strong>Every building is like a person. Single and unrepeatable.</strong>
makeMarkers(
matchers: !Object<string, !RegExp>,
config=: !MakeMarkersConfig,
): !Object.<string, !Marker>
Make markers from a configuration object. Returns an object with markers for each requested type.
!Object<string, !RegExp>
: An object with types of markers to create as keys and their detection regexes as values.!MakeMarkersConfig
(optional): Additional configuration.This function will create markers from the hash of passed matchers
object. The markers are then used to create cut
and paste
rules.
When a RegExp
specified for a marker is matched, the chunk will be replaced with a string. By default, the string has the %%_RESTREAM_MARKER_NAME_REPLACEMENT_INDEX_%%
format.
Rules (source) | Text after cut |
---|---|
|
|
This format can be modified with the additional configuration passed as the second argument by providing a function to generate replacement strings, and their respective regular expressions to replace them back with their original values.
MakeMarkersConfig
: Additional configuration.
Name | Type | Description |
---|---|---|
getReplacement | (name: string, index: number) => string | The function used to create a replacement when some text needs to be cut. |
getRegex | (name: string) => !RegExp | The function used to create a RegExp to detect replaced chunks. |
By default, %%_RESTREAM_${name.toUpperCase()}_REPLACEMENT_${index}_%%
replacement is used with new RegExp(`%%_RESTREAM_${name.toUpperCase()}_REPLACEMENT_(\d+)_%%`, 'g')
regex to detect it and restore the original value.
makeCutRule(
marker: !Marker,
): !Rule
Make a rule for initial replacement of markers.
!Marker
: A marker is used to cut and paste portions of text to exclude them from processing by other rules. Markers should be created using the makeMarker
factory method that will assign their properties.Make a rule for the Repleceable to cut out marked chunks so that they don't participate in further transformations.
makePasteRule(
marker: !Marker,
pipeRules=: !(Rule|Array<!Rule>),
): !Rule
Make a rule for pasting markers back.
!Marker
: A marker is used to cut and paste portions of text to exclude them from processing by other rules. Markers should be created using the makeMarker
factory method that will assign their properties.!(Rule | Array<!Rule>)
(optional): Any additional rules to replace the value of the marker before pasting it.
Must be synchronous.Make a rule for the Repleceable to paste back chunks replaced earlier. When the pipeRules
is given, the value of the marker will be synchronously processed before it is reinserted.
For example, given the following input:
<a href="test_hello_world.html">Example</a>
Restream can prevent _
in links from being transformed into <em>
tags, and then transform the link to prepend the #
symbol.
const { a } = makeMarkers({
a: /<a\s+.+?>[\s\S]+?<\/a>/gm,
}, {
getReplacement(name, index) {
return `RESTREAM-${name}-${index}`
},
getRegex(name) {
return new RegExp(`RESTREAM-${name}-(\\d+)`, 'g')
},
})
const replaceable = new Replaceable([
makeCutRule(a),
{ re: /_(.+?)_/g, replacement(m, val) {
return `<em>${val}</em>`
} },
makePasteRule(a, {
re: /href="(.+?)"/,
replacement(m, link) {
return `href="#${link}"`
},
}),
])
<a href="#test_hello_world.html">Example</a>
Sometimes, it might be necessary to access the value replaced by a marker's regular expression. In the example below, all inner code blocks are cut at first to preserve them as they are, then the LINKS rule is applied to generate anchors in a text. However, it is also possible that an inner code block will form part of a link, but because it has been replaced with a marker, the link rule will not work properly.
Rules (source) | Input |
---|---|
|
|
Output | |
|
To prevent this from happening, a check must be performed in the LINKS rule replacement function to see if matched text has any inner code blocks in it. If it does, the value can be accessed and placed back for the correct generation of the link name. This is achieved with the replace
function.
const getName = (title) => {
const name = title.toLowerCase()
.replace(/\s+/g, '-')
.replace(/[^\w-]/g, '')
return name
}
const { code } = makeMarkers({
code: /`(.+?)`/g,
})
const cutCode = makeCutRule(code)
const pasteCode = makePasteRule(code)
const linkRule = {
re: /\[(.+?)\]\(#LINK\)/g,
replacement(match, title) {
const realTitle = title.replace(code.regExp, (m, i) => {
const val = code.map[i]
return val
})
const name = getName(realTitle)
return `<a name="${name}">${title}</a>`
},
}
const replaceable = new Replaceable([
cutCode,
linkRule,
pasteCode,
])
`a code block`
`[link in a code block](#LINK)`
<a name="just-link">just link</a>
<a name="a-code-block-in-a-link">`A code block` in a link</a>
Now, the link is generated correctly using the title with the text inside of the code block, and not its replaced marker. Also, because the code marker's regex is used with .replace
, its lastIndex
property won't change so there's no side effects (compared to using .exec
method of a regular expression). This simple example shows how some markers can gain access to replacements made by other markers, which can have more compress applications.
The following relevant packages might be of interest.
Name | Description |
---|---|
catchment |
Collect all data flowing in from the stream into memory, and provide a promise resolved when the stream finishes. |
pedantry |
Read a directory as a stream. |
which-stream |
Create or choose source and destination (including stdout ) streams easily. |
spawncommand |
Spawn or fork a process and return a promise resolved with stdout and stderr data when it exits. |
documentary |
Transforms the markdown files to be able to insert the content of example files and their output asynchronously. |
GNU Affero General Public License v3.0
Dual licensed under AGPL-3.0 and Art Deco License for Free Open Source packages. If you require a Paid version of Restream so that you can distribute your software without publishing its source code, please complete a purchase.
© Art Deco™ 2020 |
---|
Copyright 2013 - present © cnpmjs.org | Home |