SplittableOutputStream.java

/*-
 * #%L
 * io.earcam.utilitarian.io
 * %%
 * Copyright (C) 2017 earcam
 * %%
 * SPDX-License-Identifier: (BSD-3-Clause OR EPL-1.0 OR Apache-2.0 OR MIT)
 *
 * You <b>must</b> choose to accept, in full - any individual or combination of
 * the following licenses:
 * <ul>
 * 	<li><a href="https://opensource.org/licenses/BSD-3-Clause">BSD-3-Clause</a></li>
 * 	<li><a href="https://www.eclipse.org/legal/epl-v10.html">EPL-1.0</a></li>
 * 	<li><a href="https://www.apache.org/licenses/LICENSE-2.0">Apache-2.0</a></li>
 * 	<li><a href="https://opensource.org/licenses/MIT">MIT</a></li>
 * </ul>
 * #L%
 */
package io.earcam.utilitarian.io;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.util.function.Supplier;

import javax.annotation.concurrent.NotThreadSafe;

/**
 * <p>
 * Deals with structured (e.g. XML) or unstructured data.
 * </p>
 *
 * <p>
 * User code must invoke {@link #beginRecord()} before writing {@code byte}s, and subsequently delimit safe
 * splitting points by invoking {@link #endRecord()}. The number of {@code byte}s written between the
 * call to {@link #beginRecord()} and call to {@link #endRecord()} must not exceed
 * {@link #maxSize(long)} - ({@link #header}{@code .length} + {@link #footer}{@code .length})
 * </p>
 *
 * <p>
 * A <i>record</i> is defined as any {@code byte}s written between calls to {@link #beginRecord()} and
 * {@link #endRecord()}. Should the maximum file size be specified and the length of a single record (plus
 * header and footer) exceed the maximum then a {@link BufferOverflowException} is throw.
 *
 * <p>
 * Common usage would be splitting files, in this case the {@link Supplier} is expected to <i>keep
 * track</i> of output file names.
 * </p>
 *
 * <p>
 * <b>Please note limitation</b>; due to the use of {@link Long} internally, the maximum
 * size per-file is limited to {@value java.lang.Long#MAX_VALUE} bytes (which is
 * 9,223PB or 9,223,000,000GB) per split {@link OutputStream} .
 *
 */
@SuppressWarnings("squid:S4349") // Sonar: Not applicable IMO
@NotThreadSafe
public class SplittableOutputStream extends OutputStream implements SplittableOutputStreamBuilder, SplittableOutputStreamBuilder.SplitOutputStreamBuilder {

	private final Supplier<OutputStream> supplier;
	private final byte[] header;
	private final byte[] footer;
	private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();

	private OutputStream out = null;
	private long maxFileSize = Long.MAX_VALUE;
	private long maxRecordCount = Long.MAX_VALUE;
	private long bytesCount;
	private long recordsCount;

	private boolean inScope;


	private SplittableOutputStream(Supplier<OutputStream> supplier, byte[] head, byte[] footer)
	{
		this.supplier = supplier;
		this.header = head;
		this.footer = footer;
	}


	/**
	 * Begin building a {@link SplittableOutputStream}
	 *
	 * @param next a {@link Supplier} of the underlying {@link OutputStream}s
	 * @param header written at the start of each {@link OutputStream} (e.g. file header)
	 * @param footer written at the end of each {@link OutputStream} (e.g. file footer)
	 * @return the builder for further construction
	 * @throws IOException rethrows in the unlikely event the underlying ByteArrayOutputStream buffer does
	 */
	public static SplittableOutputStreamBuilder splittable(Supplier<OutputStream> next, byte[] head, byte[] footer) throws IOException
	{
		@SuppressWarnings("squid:S2095")  // false positive - it's being returned
		SplittableOutputStream splittable = new SplittableOutputStream(next, head, footer);
		splittable.reset();
		return splittable;
	}


	private void reset() throws IOException
	{
		bytesCount = recordsCount = 0L;
		byte[] bytes = buffer.toByteArray();
		buffer.reset();
		buffer.write(header);
		buffer.write(bytes);
	}


	@Override
	public SplitOutputStreamBuilder maxSize(long bytes)
	{
		maxFileSize = bytes;
		checkSanity(header, footer, maxFileSize);
		return this;
	}


	@Override
	public SplitOutputStreamBuilder maxCount(long numberOfRecords)
	{
		requireNaturalNumber(numberOfRecords);
		maxRecordCount = numberOfRecords;
		return this;
	}


	private void requireNaturalNumber(long number)
	{
		if(number <= 0) {
			throw new IllegalArgumentException("A positive, non-zero value is required.  Received: " + number);
		}
	}


	@Override
	public SplittableOutputStream outputStream() throws IOException
	{
		return this;
	}


	private static void checkSanity(byte[] head, byte[] foot, long maxFileSize)
	{
		if(head.length + foot.length > maxFileSize) {
			throw new IllegalArgumentException("header.length + footer.length > maxFileSize: " + head.length + " + " + foot.length + " > " + maxFileSize);
		}
	}


	@Override
	public void write(int b) throws IOException
	{
		checkBeforeWrite(1);
		buffer.write(b);
	}


	private void checkBeforeWrite(int pendingBytes)
	{
		if(!inScope) {
			throw new IllegalStateException("Record scope not started");
		}
		if(pendingBytes + header.length + footer.length > maxFileSize) {
			throw new BufferOverflowException();
		}
	}


	@Override
	public void write(byte[] bytes) throws IOException
	{
		checkBeforeWrite(bytes.length);
		buffer.write(bytes);
	}


	/**
	 * Called to mark the beginning of a <i>record</i> (where a "record" is any block
	 * of bytes that can only be treated atomically; in that it's valid to split content
	 * at the record's boundaries.
	 *
	 * @see #endRecord()
	 */
	public void beginRecord()
	{
		if(inScope) {
			throw new IllegalStateException("Record scope already started");
		}
		inScope = true;
	}


	/**
	 * Called to mark the end of a <i>record</i>
	 *
	 * @throws IOException rethrows anything from the underlying {@link OutputStream}
	 *
	 * @see #beginRecord()
	 */
	public void endRecord() throws IOException
	{
		endScope();
		++recordsCount;
		if(bufferIsTooLarge() || maxRecordsExceeded()) {
			endSplit();
		}
		if(recorded()) {
			writeBuffer();
		}
	}


	private void endScope()
	{
		if(!inScope) {
			throw new IllegalStateException("Record scope not started, cannot end");
		}
		inScope = false;
	}


	private boolean bufferIsTooLarge()
	{
		return bytesCount + footer.length + buffer.size() > maxFileSize;
	}


	private boolean maxRecordsExceeded()
	{
		return recordsCount > maxRecordCount;
	}


	private void endSplit() throws IOException
	{
		if(out != null) {
			out.write(footer);
			out.close();
			out = null;
			reset();
		}
	}


	private boolean recorded()
	{
		return (recordsCount == 1 && buffer.size() > header.length)
				|| (recordsCount != 1 && buffer.size() > 0);
	}


	private void writeBuffer() throws IOException
	{
		out().write(buffer.toByteArray());
		bytesCount += buffer.size();
		buffer.reset();
	}


	private OutputStream out()
	{
		if(out == null) {
			out = supplier.get();
		}
		return out;
	}


	@Override
	public void close() throws IOException
	{
		if(recorded()) {
			throw new BufferUnderflowException();
		}
		endSplit();
		if(buffer.size() > header.length) {
			writeBuffer();
			endSplit();
		}
	}
}