Source code for rv.modules.sampler

import logging
from io import BytesIO
from itertools import chain
from struct import pack, unpack
from typing import BinaryIO, List, Optional

from logutils import BraceMessage as _F
from rv.chunks.chunk import Chunk
from rv.controller import Controller
from rv.modules import Behavior as B
from rv.modules import Module
from rv.modules.base.sampler import BaseSampler
from rv.note import NOTE
from rv.readers.reader import read_sunvox_file
from rv.synth import Synth

log = logging.getLogger(__name__)


[docs] class Sampler(BaseSampler, Module): INS_SIGN = b"PMAS" # "SAMP" in little-endian INS_VERSION = 6 XI_ENV_POINTS = 12 chnk = 0x010B options_chnm = 0x0101 behaviors = {B.receives_notes, B.sends_audio} effect: Optional[Synth] class NoteSampleMap(dict): start_note = NOTE.C0 end_note = NOTE.a9 default_sample = 0 def __init__(self): super(Sampler.NoteSampleMap, self).__init__( (NOTE(note_value), self.default_sample) for note_value in range(self.start_note.value, self.end_note.value + 1) ) @property def bytes(self): return bytes(self.values()) @bytes.setter def bytes(self, value): for k, v in zip(self.keys(), value): self[k] = v class Envelope: chnm = None range = None initial_points = None initial_sustain_point = None initial_loop_start_point = None initial_loop_end_point = None initial_enable = None initial_sustain = None initial_loop = None initial_ctl_index = 0 initial_gain_pct = 100 initial_velocity = 0 initial_controller = 0 _legacy_point_bytes = None _legacy_active_points = None _legacy_sustain_point = None _legacy_loop_start_point = None _legacy_loop_end_point = None _legacy_bitmask = None def __init__(self): self.points = self.initial_points[:] self.sustain_point = self.initial_sustain_point self.loop_start_point = self.initial_loop_start_point self.loop_end_point = self.initial_loop_end_point self.enable = self.initial_enable self.sustain = self.initial_sustain self.loop = self.initial_loop self.ctl_index = self.initial_ctl_index self.gain_pct = self.initial_gain_pct self.velocity = self.initial_velocity self.loaded = False @property def bitmask(self): return self.enable | self.sustain * 2 | self.loop * 4 @bitmask.setter def bitmask(self, value): self.enable = bool(value & 1) self.sustain = bool(value & 2) self.loop = bool(value & 4) @property def point_bytes(self): y_points = (y - self.range[0] // 0x200 for y in self._y_values) values = list(chain.from_iterable(zip(self._x_values, y_points))) return pack("<" + "H" * len(values), *values) @property def _x_values(self): values = [x for x, y in self.points] while len(values) < 12: values.append(0) return values[:12] @property def _y_values(self): values = [y // 0x200 for x, y in self.points] while len(values) < 12: values.append(0) return values[:12] def chunks(self): yield b"CHNM", pack("<I", self.chnm) data = pack( "<HBBB", self.bitmask, self.ctl_index, self.gain_pct, self.velocity, ) data += b"\0\0\0" data += pack( "<HHHH", len(self.points), self.sustain_point, self.loop_start_point, self.loop_end_point, ) data += b"\0\0\0\0" for x, y in self.points: data += pack("<HH", x, y - self.range[0]) yield b"CHDT", data def load_chdt(self, chdt): ( self.bitmask, self.ctl_index, self.gain_pct, self.velocity, _, _, _, point_count, self.sustain_point, self.loop_start_point, self.loop_end_point, ) = unpack("<HBBBBBBHHHH", chdt[0:0x10]) points = self.points = [] for i in range(point_count): offset = 0x14 + i * 4 data = chdt[offset : offset + 4] x, y = unpack("<HH", data) min_y = self.range[0] points.append((x, y + min_y)) self.loaded = True class VolumeEnvelope(Envelope): chnm = 0x102 range = (0, 0x8000) initial_enable = True initial_loop = False initial_loop_start_point = 0 initial_loop_end_point = 0 initial_sustain = True initial_sustain_point = 0 initial_points = [(0, 0x8000), (8, 0), (0x80, 0), (0x100, 0)] class PanningEnvelope(Envelope): chnm = 0x103 range = (-0x4000, 0x4000) initial_enable = False initial_loop = False initial_loop_start_point = 0 initial_loop_end_point = 0 initial_sustain = False initial_sustain_point = 0 initial_points = [(0, 0), (0x40, -0x2000), (0x80, 0x2000), (0xB4, 0)] class PitchEnvelope(Envelope): chnm = 0x104 range = (-0x4000, 0x4000) initial_enable = False initial_loop = False initial_loop_start_point = 0 initial_loop_end_point = 0 initial_sustain = False initial_sustain_point = 0 initial_points = [(0, 0), (0x40, 0)] class EffectControlEnvelope(Envelope): range = (0, 0x8000) initial_enable = False initial_loop = False initial_loop_start_point = 0 initial_loop_end_point = 0 initial_sustain = False initial_sustain_point = 0 initial_points = [(0, 0x8000), (0x40, 0x8000)] def __init__(self, chnm): super().__init__() self.chnm = chnm class Sample: def __init__(self): self.data = b"" self._length = 0 self.loop_start = 0 self.loop_len = 0 self.volume = 64 self.finetune = 100 self.format = Sampler.Format.float32 self.channels = Sampler.Channels.stereo self.rate = 44100 self.loop_type = Sampler.LoopType.off self.loop_sustain = False self.panning = 0 self.relative_note = 16 self.reserved2 = 0 self.name = b"" self.start_pos = 0 @property def frame_size(self): size = { Sampler.Format.int8: 1, Sampler.Format.int16: 2, Sampler.Format.float32: 4, } multiplier = {Sampler.Channels.mono: 1, Sampler.Channels.stereo: 2} return size[self.format] * multiplier[self.channels] @property def frames(self): return len(self.data) // self.frame_size vibrato_type = Controller( BaseSampler.VibratoType, BaseSampler.VibratoType.sin, attached=False ) vibrato_attack = Controller((0, 255), 0, attached=False) vibrato_depth = Controller((0, 255), 0, attached=False) vibrato_rate = Controller((0, 63), 0, attached=False) volume_fadeout = Controller((0, 8192), 0, attached=False) samples: List[Optional[Sample]] version: int max_version: int # Legacy instrument_name: bytes volume_old: int ins_finetune: int ins_relative_note: int editor_cursor: int editor_selected_size: int # Unused unused1: int unused2: int unused3: int unused4: int unused5: int unused6: int def __init__(self, **kwargs): super(Sampler, self).__init__(**kwargs) self.volume_envelope = self.VolumeEnvelope() self.panning_envelope = self.PanningEnvelope() self.pitch_envelope = self.PitchEnvelope() self.effect_control_envelopes = [ self.EffectControlEnvelope(0x105), self.EffectControlEnvelope(0x106), self.EffectControlEnvelope(0x107), self.EffectControlEnvelope(0x108), ] self.note_samples = self.NoteSampleMap() self.samples = [None] * 128 self.instrument_name = kwargs.get("instrument_name", b"") self.version = self.INS_VERSION self.max_version = self.INS_VERSION self.unused1 = 0 self.unused2 = 0 self.unused3 = 0 self.unused4 = 0 self.unused5 = 0 self.unused6 = 0 self.volume_old = 64 self.ins_finetune = 0 self.ins_relative_note = 0 self.editor_cursor = 0 self.editor_selected_size = 0 self.effect = None # Special handling for legacy instruments. # Starts out as an empty list, then populated with raw chunks during load. # If the INS_SIGN signature is not found, the `is_legacy` flag is set True. # If it is found, it is set False and `legacy_chunks` is cleared. # When writing out, if this is None then new chunks are written. # If it is a list, then raw chunks are written out instead. # This can go away once we better support legacy instruments. self.is_legacy: bool | None = None self.legacy_chunks: list[Chunk] | None = [] def specialized_iff_chunks(self): if self.is_legacy: for chunk in self.legacy_chunks: yield from chunk.chunks() return iters = [ self.global_config_chunks(), # CHNM 0x0000 self.sample_data_chunks(), # CHNM i*2+1, i*2+2 super(Sampler, self).specialized_iff_chunks(), # CHNM 0x0101 self.volume_envelope.chunks(), # CHNM 0x0102 self.panning_envelope.chunks(), # CHNM 0x0103 self.pitch_envelope.chunks(), # CHNM 0x0104 self.effect_control_envelopes[0].chunks(), # CHNM 0x0105 self.effect_control_envelopes[1].chunks(), # CHNM 0x0106 self.effect_control_envelopes[2].chunks(), # CHNM 0x0107 self.effect_control_envelopes[3].chunks(), # CHNM 0x0108 ] for iter in iters: yield from iter if self.effect: # CHNM 0x010a f = BytesIO() self.effect.write_to(f) yield b"CHNM", b"\x0a\x01\0\0" yield b"CHDT", f.getvalue() def sample_data_chunks(self): for i, sample in enumerate(self.samples): if sample is not None: yield from self.sample_chunks(i, sample) def global_config_chunks(self): f = BytesIO() w = _StructWriter(f) vol = self.volume_envelope pan = self.panning_envelope # uint32_t unused1; w.uint32(self.unused1) # char name[ 22 ]; w.char(self.instrument_name, 22) # uint16_t unused2; w.uint16(self.unused2) # uint16_t samples_num; compacted_samples = self.samples.copy() while compacted_samples and compacted_samples[-1] is None: compacted_samples.pop() w.uint16(len(compacted_samples)) # uint16_t unused3; w.uint16(self.unused3) # uint32_t unused4; w.uint32(self.unused4) # uint8_t smp_num_old[ 96 ]; f.write(self.note_samples.bytes[:96]) # uint16_t volume_points_old[ XI_ENV_POINTS * 2 ]; f.write(vol.point_bytes) # uint16_t panning_points_old[ XI_ENV_POINTS * 2 ]; f.write(pan.point_bytes) # uint8_t volume_points_num_old; w.uint8(len(vol.points)) # uint8_t panning_points_num_old; w.uint8(len(pan.points)) # uint8_t vol_sustain_old; w.uint8(vol.sustain_point) # uint8_t vol_loop_start_old; w.uint8(vol.loop_start_point) # uint8_t vol_loop_end_old; w.uint8(vol.loop_end_point) # uint8_t pan_sustain_old; w.uint8(pan.sustain_point) # uint8_t pan_loop_start_old; w.uint8(pan.loop_start_point) # uint8_t pan_loop_end_old; w.uint8(pan.loop_end_point) # uint8_t volume_type_old; w.uint8(vol.bitmask) # uint8_t panning_type_old; w.uint8(pan.bitmask) # uint8_t vibrato_type; w.uint8(self.vibrato_type.value) # uint8_t vibrato_sweep; w.uint8(self.vibrato_attack) # uint8_t vibrato_depth; w.uint8(self.vibrato_depth) # uint8_t vibrato_rate; w.uint8(self.vibrato_rate) # uint16_t volume_fadeout; w.uint16(self.volume_fadeout) # uint8_t volume_old; w.uint8(self.volume_old) # int8_t finetune; w.int8(self.ins_finetune) # uint8_t unused5; w.uint8(self.unused5) # int8_t relative_note; w.int8(self.ins_relative_note) # uint32_t unused6; w.uint32(self.unused6) # uint32_t sign; f.write(self.INS_SIGN) # "SAMP" in little-endian # uint32_t version; w.uint32(self.version) # uint8_t smp_num[ 128 ]; f.write(self.note_samples.bytes) # uint32_t max_version; w.uint32(self.max_version) # int32_t editor_cursor; w.int32(self.editor_cursor) # int32_t editor_selected_size; w.int32(self.editor_selected_size) yield b"CHNM", pack("<I", 0) yield b"CHDT", f.getvalue() f.close() def envelope_config_chunks(self): yield b"CHNM", pack("<I", 0x101) yield b"CHDT", b"\x00\x00\x00\x00\x00\x00" def sample_chunks(self, i, sample): f = BytesIO() w = _StructWriter(f) # uint32_t length; w.uint32(sample.frames) # uint32_t reppnt; w.uint32(sample.loop_start) # uint32_t replen; w.uint32(sample.loop_len) # uint8_t volume; w.uint8(sample.volume) # int8_t finetune; w.int8(sample.finetune) # uint8_t type; sustain_flag = 4 if sample.loop_sustain else 0 format_flag = { self.Format.int8: 0x00, self.Format.int16: 0x10, self.Format.float32: 0x20, }[sample.format] channels_flag = {self.Channels.mono: 0x00, self.Channels.stereo: 0x40}[ sample.channels ] loop_format_flags = ( sample.loop_type.value | format_flag | channels_flag | sustain_flag ) w.uint8(loop_format_flags) # uint8_t panning; w.uint8(sample.panning + 0x80) # int8_t relative_note; w.int8(sample.relative_note) # uint8_t reserved2; w.uint8(sample.reserved2) # char name[ 22 ]; w.char(sample.name, 22) # uint32_t start_pos; w.uint32(sample.start_pos) yield b"CHNM", pack("<I", i * 2 + 1) yield b"CHDT", f.getvalue() f.close() yield b"CHNM", pack("<I", i * 2 + 2) yield b"CHDT", sample.data yield b"CHFF", pack("<I", sample.format.value | sample.channels.value) yield b"CHFR", pack("<I", sample.rate) def load_chunk(self, chunk): if self.is_legacy is not False: self.legacy_chunks.append(chunk) chnm = chunk.chnm chdt = chunk.chdt if chnm == self.options_chnm: self.load_options(chunk) elif chnm == 0: self.load_instrument(chunk) elif chnm < 0x101 and chnm % 2 == 1: self.load_sample_meta(chunk) elif chnm < 0x101 and chnm % 2 == 0: self.load_sample_data(chunk) elif chnm == 0x101: self._unknown_0x101 = chdt elif chnm == 0x102: self.volume_envelope.load_chdt(chdt) elif chnm == 0x103: self.panning_envelope.load_chdt(chdt) elif chnm == 0x104: self.pitch_envelope.load_chdt(chdt) elif 0x105 <= chnm <= 0x108: self.effect_control_envelopes[chnm - 0x105].load_chdt(chdt) elif chnm == 0x10A: self.effect = read_sunvox_file(BytesIO(chdt)) def load_instrument(self, chunk): data = chunk.chdt r = _StructReader(data) vol = self.volume_envelope pan = self.panning_envelope # $0000 uint32_t unused1; self.unused1 = r.uint32() # $0004 char name[ 22 ]; self.instrument_name = r.char(22) # $001a uint16_t unused2; self.unused2 = r.uint16() # $001c uint16_t samples_num; _ = r.uint16() # $001e uint16_t unused3; self.unused3 = r.uint16() # $0020 uint32_t unused4; self.unused4 = r.uint32() # $0024 uint8_t smp_num_old[ 96 ]; self.note_samples.bytes = r.bytes(96) # $0084 uint16_t volume_points_old[ XI_ENV_POINTS * 2 ]; vol._legacy_point_bytes = r.bytes(self.XI_ENV_POINTS * 2 * 2) # $0086 uint16_t panning_points_old[ XI_ENV_POINTS * 2 ]; pan._legacy_point_bytes = r.bytes(self.XI_ENV_POINTS * 2 * 2) # $0088 uint8_t volume_points_num_old; vol._legacy_active_points = r.uint8() # $0089 uint8_t panning_points_num_old; pan._legacy_active_points = r.uint8() # $008a uint8_t vol_sustain_old; vol._legacy_sustain_point = r.uint8() # $008b uint8_t vol_loop_start_old; vol._legacy_loop_start_point = r.uint8() # $008c uint8_t vol_loop_end_old; vol._legacy_loop_end_point = r.uint8() # $008d uint8_t pan_sustain_old; pan._legacy_sustain_point = r.uint8() # $008e uint8_t pan_loop_start_old; pan._legacy_loop_start_point = r.uint8() # $008f uint8_t pan_loop_end_old; pan._legacy_loop_end_point = r.uint8() # $0090 uint8_t volume_type_old; vol._legacy_bitmask = r.uint8() # $0091 uint8_t panning_type_old; pan._legacy_bitmask = r.uint8() # $0092 uint8_t vibrato_type; self.vibrato_type = self.VibratoType(r.uint8()) # $0093 uint8_t vibrato_sweep; self.vibrato_attack = r.uint8() # $0094 uint8_t vibrato_depth; self.vibrato_depth = r.uint8() # $0095 uint8_t vibrato_rate; self.vibrato_rate = r.uint8() # $0096 uint16_t volume_fadeout; self.volume_fadeout = r.uint16() # $0098 uint8_t volume_old; self.volume_old = r.uint8() # $0099 int8_t finetune; self.ins_finetune = r.int8() # $009a uint8_t unused5; self.unused5 = r.uint8() # $009b int8_t relative_note; self.ins_relative_note = r.int8() # $009c uint32_t unused6; self.unused6 = r.uint32() # $00a0 uint32_t sign; sign = r.char(4) if sign != self.INS_SIGN: log.warning("legacy signature %r != %r", sign, self.INS_SIGN) self.is_legacy = True # $00a4 uint32_t version; self.version = r.uint32() # $00a8 uint8_t smp_num[ 128 ]; self.note_samples.bytes = r.char(128) # $0128 uint32_t max_version; self.max_version = r.uint32(self.INS_VERSION) # $012b int32_t editor_cursor; self.editor_cursor = r.int32(0) # $012f int32_t editor_selected_size; self.editor_selected_size = r.int32(0) if not self.is_legacy and len(data) >= 0x190: log.warning(f"legacy instrument data of length {len(data)}") self.is_legacy = True if not self.is_legacy: self.is_legacy = False self.legacy_chunks = None def load_sample_meta(self, chunk): index = (chunk.chnm - 1) // 2 sample = self.samples[index] = self.Sample() data = chunk.chdt r = _StructReader(data) # uint32_t length; sample._length = r.uint32() # uint32_t reppnt; sample.loop_start = r.uint32() # uint32_t replen; sample.loop_len = r.uint32() # uint8_t volume; sample.volume = r.uint8() # int8_t finetune; sample.finetune = r.int8() # uint8_t type; loop_format_flags = r.uint8() loop = loop_format_flags & (0 | 1 | 2) sample.loop_type = self.LoopType(loop) format = loop_format_flags & (0x00 | 0x10 | 0x20) sample.format = { 0x00: self.Format.int8, 0x10: self.Format.int16, 0x20: self.Format.float32, }[format] if loop_format_flags & 0x40: sample.channels = self.Channels.stereo else: sample.channels = self.Channels.mono sample.loop_sustain = bool(loop_format_flags & 4) # uint8_t panning; sample.panning = r.uint8() - 0x80 # int8_t relative_note; sample.relative_note = r.int8() # uint8_t reserved2; sample.reserved2 = r.uint8() # char name[ 22 ]; sample.name = r.char(22) # uint32_t start_pos; sample.start_pos = r.uint32(0) def load_sample_data(self, chunk): index = (chunk.chnm - 2) // 2 sample = self.samples[index] sample.data = chunk.chdt format = chunk.chff & 0x07 or 1 sample.format = self.Format(format) if sample.format is None: sample.format = self.Format.int8 sample.channels = self.Channels(chunk.chff & 0x08) sample.rate = chunk.chfr def finalize_load(self): if not self.volume_envelope.loaded: self._upgrade_envelopes() def _upgrade_envelopes(self): log.info( _F( "Upgrading Sampler{} to infinite envelope format", "[{}]".format(self.index) if self.index is not None else "", ) ) vol = self.volume_envelope pan = self.panning_envelope vol.bitmask = vol._legacy_bitmask vol.sustain_point = vol._legacy_sustain_point vol.loop_start_point = vol._legacy_loop_start_point vol.loop_end_point = vol._legacy_loop_end_point pan.bitmask = pan._legacy_bitmask pan.sustain_point = pan._legacy_sustain_point pan.loop_start_point = pan._legacy_loop_start_point pan.loop_end_point = pan._legacy_loop_end_point vol_x_points = [ unpack("<H", vol._legacy_point_bytes[4 * i : 4 * i + 2])[0] for i in range(vol._legacy_active_points) ] pan_x_points = [ unpack("<H", pan._legacy_point_bytes[4 * i : 4 * i + 2])[0] for i in range(pan._legacy_active_points) ] vol_y_points = [ unpack("<H", vol._legacy_point_bytes[4 * i + 2 : 4 * i + 4])[0] * 0x200 + vol.range[0] for i in range(vol._legacy_active_points) ] pan_y_points = [ unpack("<H", pan._legacy_point_bytes[4 * i + 2 : 4 * i + 4])[0] * 0x200 + pan.range[0] for i in range(pan._legacy_active_points) ] vol.points = [ (vol_x_points[i], vol_y_points[i]) for i in range(vol._legacy_active_points) ] pan.points = [ (pan_x_points[i], pan_y_points[i]) for i in range(pan._legacy_active_points) ]
class _StructWriter: def __init__(self, f: BinaryIO): self._f = f def char(self, value: bytes, width: int) -> None: self._f.write(value.ljust(width, b"\0")[:width]) def int8(self, value: int) -> None: self._f.write(pack("<b", value)) def uint8(self, value: int) -> None: self._f.write(pack("<B", value)) def int16(self, value: int) -> None: self._f.write(pack("<h", value)) def uint16(self, value: int) -> None: self._f.write(pack("<H", value)) def int32(self, value: int) -> None: self._f.write(pack("<i", value)) def uint32(self, value: int) -> None: self._f.write(pack("<I", value)) class _StructReader: def __init__(self, data: bytes): self._data = data self._index = 0 def bytes(self, length: int) -> bytes: start = self._index new_index = start + length buf = self._data[start:new_index] self._index = new_index return buf def char(self, length: int) -> bytes: return self.bytes(length).rstrip(b"\0") def skip(self, length: int) -> None: self._index += length def int8(self, default=None) -> int: return self._read("<b", 1, default) def uint8(self, default=None) -> int: return self._read("<B", 1, default) def int16(self, default=None) -> int: return self._read("<h", 2, default) def uint16(self, default=None) -> int: return self._read("<H", 2, default) def int32(self, default=None) -> int: return self._read("<i", 4, default) def uint32(self, default=None) -> int: return self._read("<I", 4, default) def _read(self, spec: str, length: int, default=None) -> int: start = self._index new_index = start + length buf = self._data[start:new_index] if len(buf) < length: if default is None: raise RuntimeError("default not provided") return default self._index = new_index return unpack(spec, buf)[0]