Refactor packet stream handling

This commit is contained in:
Daniel Huigens 2025-05-20 01:18:18 +02:00
parent c60d8c9d91
commit 62c4d9907b
No known key found for this signature in database
GPG Key ID: CB064A128FA90686
2 changed files with 33 additions and 43 deletions

View File

@ -20,7 +20,7 @@
* @module packet/packet
*/
import { ArrayStream, getWriter as streamGetWriter, getReader as streamGetReader } from '@openpgp/web-stream-tools';
import { ArrayStream, getWriter as streamGetWriter } from '@openpgp/web-stream-tools';
import enums from '../enums';
import util from '../util';
@ -114,8 +114,7 @@ export function supportsStreaming(tag) {
* @param {Function} callback - Function to call with the parsed packet
* @returns {Boolean} Returns false if the stream was empty and parsing is done, and true otherwise.
*/
export async function readPackets(input, callback) {
const reader = streamGetReader(input);
export async function readPacket(reader, useStreamType, callback) {
let writer;
let callbackReturned;
try {
@ -146,8 +145,8 @@ export async function readPackets(input, callback) {
const packetSupportsStreaming = supportsStreaming(tag);
let packet = null;
if (packetSupportsStreaming) {
if (util.isStream(input) === 'array') {
if (useStreamType && packetSupportsStreaming) {
if (useStreamType === 'array') {
const arrayStream = new ArrayStream();
writer = streamGetWriter(arrayStream);
packet = arrayStream;
@ -240,38 +239,6 @@ export async function readPackets(input, callback) {
}
} while (wasPartialLength);
// If this was not a packet that "supports streaming", we peek to check
// whether it is the last packet in the message. We peek 2 bytes instead
// of 1 because the beginning of this function also peeks 2 bytes, and we
// want to cut a `subarray` of the correct length into `web-stream-tools`'
// `externalBuffer` as a tiny optimization here.
//
// If it *was* a streaming packet (i.e. the data packets), we peek at the
// entire remainder of the stream, in order to forward errors in the
// remainder of the stream to the packet data. (Note that this means we
// read/peek at all signature packets before closing the literal data
// packet, for example.) This forwards MDC errors to the literal data
// stream, for example, so that they don't get lost / forgotten on
// decryptedMessage.packets.stream, which we never look at.
//
// An example of what we do when stream-parsing a message containing
// [ one-pass signature packet, literal data packet, signature packet ]:
// 1. Read the one-pass signature packet
// 2. Peek 2 bytes of the literal data packet
// 3. Parse the one-pass signature packet
//
// 4. Read the literal data packet, simultaneously stream-parsing it
// 5. Peek until the end of the message
// 6. Finish parsing the literal data packet
//
// 7. Read the signature packet again (we already peeked at it in step 5)
// 8. Peek at the end of the stream again (`peekBytes` returns undefined)
// 9. Parse the signature packet
//
// Note that this means that if there's an error in the very end of the
// stream, such as an MDC error, we throw in step 5 instead of in step 8
// (or never), which is the point of this exercise.
const nextPacket = await reader.peekBytes(packetSupportsStreaming ? Infinity : 2);
if (writer) {
await writer.ready;
await writer.close();
@ -280,7 +247,6 @@ export async function readPackets(input, callback) {
// eslint-disable-next-line callback-return
await callback({ tag, packet });
}
return !nextPacket || !nextPacket.length;
} catch (e) {
if (writer) {
await writer.abort(e);
@ -292,7 +258,6 @@ export async function readPackets(input, callback) {
if (writer) {
await callbackReturned;
}
reader.releaseLock();
}
}

View File

@ -1,6 +1,6 @@
import { transformPair as streamTransformPair, transform as streamTransform, getWriter as streamGetWriter, getReader as streamGetReader, clone as streamClone, readToEnd as streamReadToEnd } from '@openpgp/web-stream-tools';
import { transformPair as streamTransformPair, transform as streamTransform, getWriter as streamGetWriter, getReader as streamGetReader, clone as streamClone } from '@openpgp/web-stream-tools';
import {
readPackets, supportsStreaming,
readPacket, supportsStreaming,
writeTag, writeHeader,
writePartialLength, writeSimpleLength,
UnparseablePacket,
@ -71,12 +71,15 @@ class PacketList extends Array {
}
const tagsRead = [];
this.stream = streamTransformPair(bytes, async (readable, writable) => {
const reader = streamGetReader(readable);
const writer = streamGetWriter(writable);
try {
let useStreamType = util.isStream(readable);
while (true) {
await writer.ready;
let error;
const done = await readPackets(readable, async parsed => {
let wasStream;
await readPacket(reader, useStreamType, async parsed => {
try {
if (parsed.tag === enums.packet.marker || parsed.tag === enums.packet.trust || parsed.tag === enums.packet.padding) {
// According to the spec, these packet types should be ignored and not cause parsing errors, even if not explicitly allowed:
@ -91,6 +94,7 @@ class PacketList extends Array {
tagsRead.push(parsed.tag);
packet.packets = new PacketList();
packet.fromStream = util.isStream(parsed.packet);
wasStream = packet.fromStream;
try {
await packet.read(parsed.packet, config);
} catch (e) {
@ -147,14 +151,35 @@ class PacketList extends Array {
util.printDebugError(e);
}
});
if (wasStream) {
// Don't allow more than one streaming packet, as read errors
// may get lost in the second packet's data stream.
useStreamType = null;
}
// If there was a parse error, read the entire input first
// in case there's an MDC error, which should take precedence.
if (error) {
await streamReadToEnd(readable);
await reader.readToEnd();
// eslint-disable-next-line @typescript-eslint/no-throw-literal
throw error;
}
// We peek to check whether this was the last packet.
//
// If this was not a packet that supports streaming, we peek 2
// bytes instead of 1 because `readPacket` also peeks 2 bytes,
// and we want to cut a `subarray` of the correct length into
// `web-stream-tools`' `externalBuffer` as a tiny optimization
// here.
//
// If it *was* a streaming packet (i.e. the data packets), we
// peek at the entire remainder of the stream, in order to
// forward errors in the remainder of the stream to the packet
// stream. This throws MDC errors, for example, so that they
// take precedence over parse errors.
const nextPacket = await reader.peekBytes(wasStream ? Infinity : 2);
const done = !nextPacket || !nextPacket.length;
if (done) {
// Here we are past the MDC check for SEIPDv1 data, hence
// the data is always authenticated at this point.