Data compression [fr]

Cette page présente un système relativement simple de compression, assez proche dans l’idée de certain des systèmes existants, et basé sur la prédiction par reconnaissance partielle.

Au passage, je présente l’incontournable codage de Huffman, mais également les chaines de Markov.

Idée de base

Pour compresser un fichier texte, on peut partir d’un constat tout simple : sur les 256 valeurs possibles pour un octet, seule quelques valeurs sont vraiment utilisées (celles qui correspondent aux caractères de la langue).

De plus, certaines valeurs, comme celle correspondant aux lettres « a » et « e » vont être bien plus utilisées que les autres encore.

On peut donc utiliser une technique toute simple, qui consiste à coder les lettres les plus fréquentes sur un nombre de bits plus faible que les lettres les moins fréquentes : c’est le codage de Huffman.

Le codage de Huffman

Supposons qu’on dispose d’un ensemble d’éléments, associés à leur fréquence (par exemple les lettres de l’alphabet avec leur fréquence en Français).

Le codage de Huffman va permettre de construire un arbre binaire, dans lequel chaque feuille représentera un élément de notre ensemble.

L’arbre aura une forme telle que les feuilles qui correspondent aux éléments les plus fréquents seront situées près de la racine, alors que les éléments très peu fréquents nécessiteront un parcours complexe depuis la racine pour y accéder.

Pour référencer un élément précis, il suffira d’indiquer le chemin depuis la racine vers la feuille correspondant à l’élément. À chaque nœud, le fait de prendre la feuille de gauche sera noté par un 0, et le fait de prendre celle de droite par un 1. Une fois qu’on est arrivé à une feuille, on a donc notre élément, et on retourne à la racine pour obtenir l’élément suivant, etc…

Du coup, les éléments les plus fréquents auront un codage ultra-court.

Le codage de Huffman n’est pas le plus efficace existant (cf le codage arithmétique),

mais reste très simple à comprendre, et facile à mettre en œuvre.

On peut faire mieux…

Avec notre système de compression initial, notre archive pourra donc contenir un en-tête décrivant l’arbre de Huffman. Ensuite, il suffit de mettre une suite de bits qui ira taper dans les feuilles le l’arbre, et on peut donc arriver à un taux de compression déjà appréciable pour du texte.

Toutefois, on peut vite remarquer quelque chose : si on vient de décoder une virgule, il est absurde de considérer qu’après, il y a une forte probabilité d’avoir un « a » ou un « e ». C’est possible, mais on sait qu’on aura quasiment toujours un espace, donc autant coder l’espace sur le moins de bits possible.

Pour cela, on peut utiliser les chaînes de Markov (un concept très simple).

Chaînes de Markov

Le principe des Chaînes de Markov, c’est qu’à partir de l’état précédent (la dernière lettre vue), on est en mesure de donner la probabilité de tomber sur chaque autre lettre.

Mieux encore, on peut ne pas utiliser l’état précédent, mais les N états précédents : on fait alors une chaîne de Markov d’ordre N. Ainsi, on peut regarder les deux dernières lettres vues, pour en déduire les probabilités pour la lettre suivante avec encore plus de fiabilité.

Interlude ludique

On peut utiliser les chaînes de Markov pour générer des suites de mots à partir d’un texte de base.

Le principe est simple : on part d’un mot choisi au hasard au début d’une phrase du texte. Ensuite, on va boucler en prenant le dernier mot choisi, et en regardant les mots qui suivent ce dernier dans le texte de base. Parmi tous ces mots, il suffit d’en choisir un au hasard (en respectant les proportions, si un mot apparaît deux fois plus, il aura deux fois plus de chances d’être pris), et de recommencer.

Pour plus de vraisemblance, on va généralement chercher les mots qui suivront non pas le dernier mot généré, mais les deux ou trois derniers mots générés, ainsi le texte fabriqué aura encore plus l’air normal.

Ceci permet de générer de nouveaux textes n’ayant pas vraiment de sens, mais qui ressemblent complètement à des textes normaux.

Par exemple, à partir des Harry Potter, on peut obtenir ceci :

  • Malgré lui, Harry pencha la tête derrière les jambes de Rogue et de Maugrey…

  • Ron et Hermione les accompagna en marchant à si grandes enjambées que Harry et Hermione prirent docilement leur chocolat.

Ou même, si on part des Harry Potter et de la Bible, on peut obtenir des mélanges surprenants :

  • Ainsi, avec une fronde et une pierre, David fut plus fort que jamais. Harry, Ron et Hermione le regardèrent d’un air interloqué.

  • Harry baissa les yeux vers le ciel, il les bénit. Puis, il les rompit, et les donna aux disciples, en disant: Prenez, mangez, ceci est mon corps.

  • 11:43 Hanan, fils de Jigdalia, homme de Dieu, bénit les enfants d’Israël, après les avoir regardés pendant un bon moment avant de comprendre de quoi Ron voulait parler.

  • 16:20 Isaï prit un eunuque qui était Nicolas Flamel. Il se souvient pas de chose le crime de Sodome, ta soeur.

Et si on rajoute du Marx :

  • 29:7 Si je ne les avez attrapés ? Barty ! Qu’est-ce que c’est toi qui donnes la force de travail. La valeur qu’il crée en six heures, la pratique du sortilège Doloris.

  • Ceins tes reins comme un canon en les forçant à adopter le nouveau mode de production fondée sur les serpents pour en dilater encore l’échelle des entreprises.

Si vous voulez vous amuser, le script est ici.

En pratique

On peut combiner cette idée avec le fait que la portion déjà décompressée de texte peut servir pour les prédictions : au début, on n’aura aucune information sur le texte et la compression sera peu efficace, et au fur et à mesure, les prédictions s’affineront et le texte sera de mieux en mieux compressé.

Du coup, on n’a même plus besoin de stocker d’arbre de Huffman dans le fichier : l’arbre est calculé à la compression et à la décompression de chaque octet, en fonction du texte déjà traité.

Et encore mieux…

Autre méthode de prédiction

Jusqu’ici, on prédit juste l’octet qui va suivre le dernier octet traité, de manière la plus efficace possible.

Mais il est même possible de tenter de prédire les 2 octets suivants, voire carrément des mots ou des bouts de phrases : par exemple, si, dans le texte courant, j’écris « Hu », il est à peu près certain que la suite sera « ffman ».

On peut donc regarder si on trouve les derniers caractères traités dans le texte, et voire ce qui les suit le plus fréquemment.

Du coup, on pourra ensuite coder des mots ou bouts de phrases en quelques bits, si ils étaient facilement prévisibles.

Et si on les combinait ?

On a trouvé pas mal de méthodes pour prédire ce qui va suivre ce qu’on vient de traiter.

Du coup, pourquoi ne pas toutes les combiner ? Pour chaque prédiction obtenue, il suffit de faire la moyenne de sa probabilité donnée par chaque prédicteur.

Pour améliorer un peu la chose, on peut donner un coefficient à chaque prédicteur, comme ça on peut avoir un prédicteur très pertinent (qui donnera une probabilité très grande, codant sa prédiction en peu de bits), mais si la prédiction échoue, on pourra se rabattre sur des prédictions données par d’autres prédicteurs plus généralistes.

Conclusion

Et dans la vraie vie, ça donne quoi ?

Beaucoup d’algorithmes de compression sont plus simples, et auront plutôt tendance à ne pas vraiment prédire, mais plutôt faire des références vers des suites d’octets qui sont déjà passées, tout simplement.

Toutefois, les meilleurs algorithmes de compression existants, comme PAQ, ont une base très similaire à la méthode que j’ai présentée : prédiction par reconnaissance partielle et pondération de contexes.

Eux aussi vont prédire les probabilités d’apparition de la suite à partir des derniers caractères décodés, et eux aussi disposent d’un paquet de prédicteurs très spécialisés qui renvoient des probabilités qui sont combinées entre elles par un mixeur (qui n’est pas juste « on va faire une moyenne pondérée de toutes les probabilités », comme dans mon cas).

Pour aller plus dans les détails, je vous recommande chaudement ce cours très complet.

À vous de jouer

Vous voulez vous amuser ? Dans mon petit script, tout se base sur les prédicteurs, définis dans l’initialisation de la classe Engine. Les prédictions se basent sur les probabilités renvoyées par tous les prédicteurs (pondérés par les coefficients à leur droite). Si vous changez les coefficients de pondération, ou que vous modifiez les paramètres ou enlevez/ajoutez des prédicteurs, il est possible d’arriver à compresser plus efficacement.

Encore mieux, il est possible de coder votre propre prédicteur, et de voir si il s’avère plus efficace que ceux que j’ai implémentés.

Il y a aurait aussi moyen de faire varier les coefficients de pondération de chaque prédicteur : certaines parties se compresseront mieux en utilisant les probabilités renvoyées par tel prédicteur, d’autre par tel autre prédicteur…

Annexe : Place au code

#!/usr/bin/env python3

import heapq
import collections
import sys
import os

# Little hack to have binary stdin/stdout
sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb')
sys.stdout = os.fdopen(sys.stdout.fileno(), 'wb')

CHUNK_SIZE = 1024
EOF = -1


### PREDICTORS ###

class MarkovPredictor(object):
    """ Can predict the « len_after » next characters, based on the « len_before » previous characters seen. """
    def __init__(self, len_before=1, len_after=1):
        self.len_before, self.len_after = len_before, len_after
        self.text_end = b''
        self.before = collections.defaultdict(lambda: collections.defaultdict(int))
        self.count = 0

    def feed(self, text):
        len_added = len(text)
        text = self.text_end[-self.len_before-self.len_after:] + text
        self.text_end = text
        self.count += len_added

        for i in range(len(text) - len_added - self.len_after, len(text) - self.len_after):
            if i < self.len_before:
                continue
            self.before[text[i-self.len_before:i]][text[i:i+self.len_after]] += 1

    def predict(self):
        if self.len_before > 0:
            text = self.text_end[-self.len_before:]
        else:
            text = b''
        return {k: v / self.count for k, v in self.before[text].items()}


class FixedPredictor(object):
    """ Prediction based on a fixed set of probabilities. """
    def __init__(self, freqs):
        self.freqs = freqs

    def feed(self, text):
        pass

    def predict(self):
        return self.freqs


class FullCharsetPredictor(FixedPredictor):
    """ The full set of possible characters need to be known, or it will impossible to code them. """
    def __init__(self):
        freqs = {bytes((i,)): 1 for i in range(256)}
        freqs[EOF] = 1 # special data to indicate the end of the stream
        super(FullCharsetPredictor, self).__init__(freqs)


class UnboundedPredictor(object):
    """ A homemade predictor, that don't predict fixed-length strings, but can predict long ones. """
    def __init__(self, exponent=2, min_len_before=1):
        assert exponent >= 1 and min_len_before >= 1
        self.text = b''
        self.exponent = exponent
        self.min_len_before = min_len_before

    def feed(self, text):
        self.text += text

    def predict(self):
        text = self.text
        predictions = collections.defaultdict(float)
        predictions_todo = collections.defaultdict(list)

        if not text:
            return {}

        # On cherche directement à avoir le dernier caractère qui correspond,
        # c'est plus rapide que de se promener dans tout le texte.
        i = -1
        while True:
            i = text.find(text[-1], i + 1, -1)
            if i == -1:
                break

            # On compte le nombre de caractères identiques avec la fin du texte actuel
            matching_before = 1
            while text[i-matching_before] == text[-matching_before-1] and i - matching_before >= 0:
                matching_before += 1
            if matching_before >= self.min_len_before:
                predictions_todo[bytes((text[i+1],))].append((i, matching_before))

        while predictions_todo:
            prediction, l = predictions_todo.popitem()

            score = sum(i[1]**self.exponent for i in l) / len(text)
            if len(l) < 2:
                continue

            appended_count = 0
            created_count = 0

            for i, matching_before in l:
                if i + len(prediction) >= len(text) - 1:
                    continue

                p = predictions_todo[text[i+1:i+len(prediction)+2]]
                if p:
                    appended_count += 1
                else:
                    created_count += 1
                p.append((i, matching_before))

            if appended_count != len(l) - 1 or created_count != 1:
                predictions[prediction] += score

        return predictions


class RUnboundedPredictor(UnboundedPredictor):
    """ The same as the UnboundedPredictor, but its predictions are based
        only on the « length » last characters seen. """
    def __init__(self, length=1000, *args, **kwargs):
        self.length = length
        super(RUnboundedPredictor, self).__init__(*args, **kwargs)

    def feed(self, text):
        self.text = (self.text + text)[-self.length:]


### HUFFMAN ###

class TreeNode(object):
    def __init__(self, weight, label=None, left=None, right=None):
        assert label is not None or left is not None or right is not None
        self.label = label
        self.weight = weight
        self.left, self.right = left, right

    def __lt__(self, other):
        if self.weight == other.weight:
            try:
                return self.label < other.label
            except TypeError:
                return repr(self.label) < repr(other.label)
        return self.weight < other.weight

    def __gt__(self,other):
        if self.weight == other.weight:
            try:
                return self.label > other.label
            except TypeError:
                return repr(self.label) > repr(other.label)
        return self.weight > other.weight

    def is_leaf(self):
        return self.left is None and self.right is None

    def __repr__(self):
        if self.is_leaf():
            return 'TreeNode(%s, %s)' % (self.weight, self.label)
        return 'TreeNode(%s, %s, %s, %s)' % (self.weight, self.label, self.left, self.right)

def make_huffman_dict(trees):
    trees.sort()
    heapq.heapify(trees)

    while len(trees) > 1:
        childR, childL = heapq.heappop(trees), heapq.heappop(trees)
        parent = TreeNode(childL.weight + childR.weight, left=childL, right=childR)
        heapq.heappush(trees, parent)

    def build_dict(d, tree, prefix):
        if tree.is_leaf():
            d[tree.label] = prefix
        else:
            build_dict(d, tree.left, prefix + '0')
            build_dict(d, tree.right, prefix + '1')

    d = {}
    build_dict(d, trees[0], '')
    return d


### ENGINE ###

def binstr_to_bytes(binary_string):
    """ To convert a string like 01001101010101 to a series of bytes """
    assert len(binary_string) % 8 == 0
    output_buffer = bytearray()
    for i in range(0, len(binary_string), 8):
        output_buffer.append(int(binary_string[i:i+8], 2))
    return output_buffer

def bytes_to_binstr(b):
    """ To convert a series of bytes to a string containing its binary form """
    return ''.join('{:08b}'.format(i) for i in b)

class Engine():
    def __init__(self):
        self.predictors = {
                FullCharsetPredictor() : 1e-8,
                MarkovPredictor(0): 1,
                MarkovPredictor(): 5,
                UnboundedPredictor(2.5): 12,
        }

    def _huffman_dict(self):
        self.predictions = collections.defaultdict(lambda: [0, []])
        for predictor in self.predictors:
            for prediction, weight in predictor.predict().items():
                self.predictions[prediction][0] += weight * self.predictors[predictor]
                self.predictions[prediction][1].append(predictor)

        return make_huffman_dict([TreeNode(v[0], k) for k, v in self.predictions.items()])

    def compress(self, stream_in, stream_out):
        buffer_in = stream_in.read(CHUNK_SIZE)
        buffer_out = ''

        while buffer_in:
            huffman_dict = self._huffman_dict()

            to_encode = None
            best_ratio = 0
            for n in self.predictions:
                if n != EOF and buffer_in.startswith(n):
                    ratio = len(n) / len(huffman_dict[n])
                    if ratio > best_ratio:
                        to_encode = n
                        best_ratio = ratio

            buffer_out += huffman_dict[to_encode]
            if len(buffer_out) % 8 == 0:
                stream_out.write(binstr_to_bytes(buffer_out))
                buffer_out = ''

            buffer_in = buffer_in[len(to_encode):]
            if len(buffer_in) < CHUNK_SIZE:
                buffer_in = buffer_in + stream_in.read(CHUNK_SIZE)

            for predictor in self.predictors:
                predictor.feed(to_encode)

            #sys.stderr.write("{} bits encodés en {} bits.\n".format(len(to_encode)*8, len(huffman_dict[to_encode])))

        eof_sequence = self._huffman_dict()[-1]
        buffer_out += eof_sequence
        buffer_out += '0' * (8 - len(buffer_out) % 8)
        stream_out.write(binstr_to_bytes(buffer_out))

    def decompress(self, stream_in, stream_out):
        buffer_in = bytes_to_binstr(stream_in.read(CHUNK_SIZE))

        while buffer_in:
            for decompressed, huffman in self._huffman_dict().items():
                if buffer_in.startswith(huffman):
                    if decompressed == EOF:
                        return

                    stream_out.write(decompressed)

                    buffer_in = buffer_in[len(huffman):]
                    if len(buffer_in) < CHUNK_SIZE:
                        buffer_in += bytes_to_binstr(stream_in.read(CHUNK_SIZE))

                    for predictor in self.predictors:
                        predictor.feed(decompressed)

                    break

if __name__ == '__main__':
    engine = Engine()
    if sys.argv[1] == 'compress':
        engine.compress(sys.stdin, sys.stdout)
    elif sys.argv[1] == 'decompress':
        engine.decompress(sys.stdin, sys.stdout)

Ce code est fait en Python 3. Il est utilisable très facilement en console :

python3 script.py compress < fichier.txt > fichier_compressé

Et pour décompresser :

python3 script.py decompress < fichier_compressé > fichier.txt

Plein d’autres usages sont possible, puisque le script se base sur l’entrée et la sortie standard, et lit/écrit au fur et à mesure (streaming).

Attention toutefois : Il est horriblement lent. Vous voilà prévenus.