работающая стеганография LSB и DCT
я устал... DCT убивает
This commit is contained in:
191
core/dct.py
191
core/dct.py
@@ -0,0 +1,191 @@
|
||||
"""
|
||||
Модуль DCT стеганографии с квантованием (JPEG-style)
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from scipy.fftpack import dct, idct
|
||||
from .utils import text_to_bits, bits_to_text
|
||||
|
||||
BLOCK_SIZE = 8
|
||||
|
||||
# Стандартная таблица квантования JPEG для качества 50%
|
||||
QUANT_TABLE = np.array([
|
||||
[16, 11, 10, 16, 24, 40, 51, 61],
|
||||
[12, 12, 14, 19, 26, 58, 60, 55],
|
||||
[14, 13, 16, 24, 40, 57, 69, 56],
|
||||
[14, 17, 22, 29, 51, 87, 80, 62],
|
||||
[18, 22, 37, 56, 68, 109, 103, 77],
|
||||
[24, 35, 55, 64, 81, 104, 113, 92],
|
||||
[49, 64, 78, 87, 103, 121, 120, 101],
|
||||
[72, 92, 95, 98, 112, 100, 103, 99]
|
||||
])
|
||||
|
||||
|
||||
def _get_zigzag_order() -> list:
|
||||
"""Zigzag порядок для матрицы 8x8."""
|
||||
zigzag = []
|
||||
for s in range(15):
|
||||
if s % 2 == 0:
|
||||
for i in range(min(s, 7), max(0, s - 7) - 1, -1):
|
||||
j = s - i
|
||||
if 0 <= i < 8 and 0 <= j < 8:
|
||||
zigzag.append(i * 8 + j)
|
||||
else:
|
||||
for i in range(max(0, s - 7), min(s, 7) + 1):
|
||||
j = s - i
|
||||
if 0 <= i < 8 and 0 <= j < 8:
|
||||
zigzag.append(i * 8 + j)
|
||||
return zigzag
|
||||
|
||||
|
||||
ZIGZAG_ORDER = _get_zigzag_order()
|
||||
MID_FREQ_INDICES = ZIGZAG_ORDER[1:40]
|
||||
|
||||
|
||||
def _dct_quantize(block: np.ndarray) -> np.ndarray:
|
||||
"""DCT + квантование."""
|
||||
dct_block = dct(dct(block, axis=0, norm='ortho'), axis=1, norm='ortho')
|
||||
quantized = np.round(dct_block / QUANT_TABLE)
|
||||
return quantized.astype(np.int32)
|
||||
|
||||
|
||||
def _idct_dequantize(quantized: np.ndarray) -> np.ndarray:
|
||||
"""Обратное квантование + IDCT."""
|
||||
dct_block = quantized * QUANT_TABLE
|
||||
block = idct(idct(dct_block, axis=0, norm='ortho'), axis=1, norm='ortho')
|
||||
block = np.round(block)
|
||||
return np.clip(block, 0, 255).astype(np.uint8)
|
||||
|
||||
|
||||
def _extract_bits_from_block(block_quant: np.ndarray, max_bits: int) -> list:
|
||||
"""Извлекает биты из квантованных коэффициентов."""
|
||||
extracted = []
|
||||
coeff_flat = block_quant.flatten()
|
||||
|
||||
for idx in MID_FREQ_INDICES:
|
||||
if len(extracted) >= max_bits:
|
||||
break
|
||||
if idx >= len(coeff_flat):
|
||||
break
|
||||
bit = int(coeff_flat[idx]) & 1
|
||||
extracted.append(bit)
|
||||
return extracted
|
||||
|
||||
|
||||
def _embed_bits_in_block(block_quant: np.ndarray, bits: list, bit_index: int) -> tuple:
|
||||
"""Внедряет биты в квантованные коэффициенты."""
|
||||
modified = block_quant.copy()
|
||||
current_idx = bit_index
|
||||
coeff_flat = modified.flatten()
|
||||
|
||||
for idx in MID_FREQ_INDICES:
|
||||
if current_idx >= len(bits):
|
||||
break
|
||||
if idx >= len(coeff_flat):
|
||||
break
|
||||
new_val = (int(coeff_flat[idx]) & 0xFE) | bits[current_idx]
|
||||
coeff_flat[idx] = new_val
|
||||
current_idx += 1
|
||||
|
||||
return coeff_flat.reshape(BLOCK_SIZE, BLOCK_SIZE), current_idx
|
||||
|
||||
|
||||
def encode_dct(image_path: str, message: str, output_path: str) -> bool:
|
||||
"""Скрывает сообщение в изображении методом DCT."""
|
||||
img = Image.open(image_path).convert('L')
|
||||
pixels = np.array(img, dtype=np.float64)
|
||||
height, width = pixels.shape
|
||||
|
||||
# Кодируем сообщение в байты
|
||||
message_bytes = message.encode('utf-8')
|
||||
msg_length = len(message_bytes)
|
||||
|
||||
# Формат: [длина: 32 бита] + [сообщение]
|
||||
# 32 бита = 4 байта, достаточно для сообщений до 4 ГБ
|
||||
length_bits = format(msg_length, '032b')
|
||||
message_bits = text_to_bits(message)
|
||||
|
||||
# Собираем всё вместе
|
||||
all_bits = length_bits + message_bits
|
||||
bit_list = [int(b) for b in all_bits]
|
||||
total_bits = len(bit_list)
|
||||
|
||||
# Проверяем вместимость
|
||||
blocks_per_row = width // BLOCK_SIZE
|
||||
blocks_per_col = height // BLOCK_SIZE
|
||||
max_bits = blocks_per_row * blocks_per_col * len(MID_FREQ_INDICES)
|
||||
|
||||
if total_bits > max_bits:
|
||||
print(f"Ошибка: сообщение слишком большое.")
|
||||
print(f"Доступно: {max_bits}, требуется: {total_bits}")
|
||||
return False
|
||||
|
||||
modified_pixels = pixels.copy()
|
||||
bit_index = 0
|
||||
|
||||
for i in range(0, height - BLOCK_SIZE + 1, BLOCK_SIZE):
|
||||
for j in range(0, width - BLOCK_SIZE + 1, BLOCK_SIZE):
|
||||
if bit_index >= total_bits:
|
||||
break
|
||||
|
||||
block = pixels[i:i+BLOCK_SIZE, j:j+BLOCK_SIZE]
|
||||
quant_block = _dct_quantize(block)
|
||||
quant_block, bit_index = _embed_bits_in_block(quant_block, bit_list, bit_index)
|
||||
new_block = _idct_dequantize(quant_block)
|
||||
modified_pixels[i:i+BLOCK_SIZE, j:j+BLOCK_SIZE] = new_block
|
||||
|
||||
if bit_index >= total_bits:
|
||||
break
|
||||
|
||||
result_img = Image.fromarray(modified_pixels.astype(np.uint8), mode='L')
|
||||
result_img.save(output_path)
|
||||
print(f"Успешно! Спрятано {total_bits} бит ({total_bits // 8} байт)")
|
||||
return True
|
||||
|
||||
|
||||
def decode_dct(image_path: str) -> str:
|
||||
"""Извлекает сообщение из изображения методом DCT."""
|
||||
img = Image.open(image_path).convert('L')
|
||||
pixels = np.array(img, dtype=np.float64)
|
||||
height, width = pixels.shape
|
||||
|
||||
# Извлекаем все биты
|
||||
all_bits = []
|
||||
|
||||
for i in range(0, height - BLOCK_SIZE + 1, BLOCK_SIZE):
|
||||
for j in range(0, width - BLOCK_SIZE + 1, BLOCK_SIZE):
|
||||
block = pixels[i:i+BLOCK_SIZE, j:j+BLOCK_SIZE]
|
||||
quant_block = _dct_quantize(block)
|
||||
bits_from_block = _extract_bits_from_block(quant_block, len(MID_FREQ_INDICES))
|
||||
all_bits.extend(bits_from_block)
|
||||
|
||||
# Читаем длину сообщения (первые 32 бита)
|
||||
if len(all_bits) < 32:
|
||||
return ""
|
||||
|
||||
length_bits = all_bits[:32]
|
||||
length_str = ''.join(str(b) for b in length_bits)
|
||||
msg_length = int(length_str, 2)
|
||||
|
||||
# Проверяем, что длина разумная
|
||||
if msg_length > len(all_bits) // 8:
|
||||
return ""
|
||||
|
||||
# Читаем сообщение
|
||||
message_bits = all_bits[32:32 + msg_length * 8]
|
||||
|
||||
# Дополняем до кратности 8 (хотя уже должно быть кратно)
|
||||
remainder = len(message_bits) % 8
|
||||
if remainder != 0:
|
||||
message_bits.extend([0] * (8 - remainder))
|
||||
|
||||
if len(message_bits) == 0:
|
||||
return ""
|
||||
|
||||
bits_string = ''.join(str(b) for b in message_bits)
|
||||
try:
|
||||
return bits_to_text(bits_string)
|
||||
except Exception as e:
|
||||
print(f"Ошибка при декодировании: {e}")
|
||||
return ""
|
||||
137
debug_dct_russian.py
Normal file
137
debug_dct_russian.py
Normal file
@@ -0,0 +1,137 @@
|
||||
"""
|
||||
Отладка DCT: русский текст
|
||||
"""
|
||||
|
||||
from core.dct import encode_dct, _apply_dct, _extract_bits_from_block, MID_FREQ_INDICES
|
||||
from core.utils import text_to_bits, bits_to_text
|
||||
from PIL import Image
|
||||
import numpy as np
|
||||
|
||||
print("=" * 60)
|
||||
print("Отладка DCT: русский текст")
|
||||
print("=" * 60)
|
||||
|
||||
# Создаем тестовое изображение
|
||||
img = Image.new('L', (400, 400), color=128)
|
||||
img.save("debug_russian.png")
|
||||
|
||||
# Русское сообщение
|
||||
message = "Привет"
|
||||
print(f"Оригинальное сообщение: '{message}'")
|
||||
print(f"Длина в символах: {len(message)}")
|
||||
|
||||
# Покажем биты оригинального сообщения
|
||||
bits_original = text_to_bits(message)
|
||||
print(f"\nОригинальные биты: {bits_original}")
|
||||
print(f"Длина битов: {len(bits_original)}")
|
||||
|
||||
# Добавляем стоп-маркер
|
||||
bits_with_stop = bits_original + '00000000'
|
||||
print(f"Биты со стоп-маркером: {bits_with_stop}")
|
||||
print(f"Длина со стоп-маркером: {len(bits_with_stop)}")
|
||||
|
||||
# Кодируем
|
||||
print("\n--- Кодирование ---")
|
||||
encode_dct("debug_russian.png", message, "debug_russian_encoded.png")
|
||||
|
||||
# Загружаем закодированное изображение и извлекаем биты
|
||||
img_encoded = Image.open("debug_russian_encoded.png").convert('L')
|
||||
pixels = np.array(img_encoded, dtype=np.float64)
|
||||
height, width = pixels.shape
|
||||
|
||||
print("\n--- Извлечение битов из блоков ---")
|
||||
all_bits = []
|
||||
block_count = 0
|
||||
|
||||
for i in range(0, height - 8 + 1, 8):
|
||||
for j in range(0, width - 8 + 1, 8):
|
||||
block = pixels[i:i+8, j:j+8]
|
||||
dct_block = _apply_dct(block)
|
||||
bits_from_block = _extract_bits_from_block(dct_block, len(MID_FREQ_INDICES))
|
||||
all_bits.extend(bits_from_block)
|
||||
block_count += 1
|
||||
|
||||
# Покажем первые 3 блока
|
||||
if block_count <= 3:
|
||||
print(f"Блок {block_count}: {bits_from_block[:30]}...")
|
||||
|
||||
print(f"\nВсего извлечено битов: {len(all_bits)}")
|
||||
|
||||
# Сравним оригинальные и извлеченные биты (первые 100)
|
||||
print("\n" + "=" * 60)
|
||||
print("Сравнение битов (первые 100):")
|
||||
print("=" * 60)
|
||||
print(f"Оригинал: {bits_with_stop[:100]}")
|
||||
print(f"Извлечено: {''.join(str(b) for b in all_bits[:100])}")
|
||||
|
||||
# Найдем позиции, где биты отличаются
|
||||
print("\n" + "=" * 60)
|
||||
print("Различия в битах:")
|
||||
print("=" * 60)
|
||||
diff_count = 0
|
||||
for i in range(min(len(bits_with_stop), len(all_bits))):
|
||||
orig_bit = int(bits_with_stop[i])
|
||||
ext_bit = all_bits[i]
|
||||
if orig_bit != ext_bit:
|
||||
diff_count += 1
|
||||
if diff_count <= 20:
|
||||
print(f"Позиция {i}: было {orig_bit}, стало {ext_bit}")
|
||||
print(f"Всего различий: {diff_count}")
|
||||
|
||||
# Попробуем декодировать извлеченные биты без стоп-маркера
|
||||
print("\n" + "=" * 60)
|
||||
print("Декодирование извлеченных битов:")
|
||||
print("=" * 60)
|
||||
|
||||
# Преобразуем биты в байты и покажем их
|
||||
extracted_bytes = []
|
||||
for k in range(0, min(len(all_bits), 200), 8):
|
||||
if k + 8 <= len(all_bits):
|
||||
byte_bits = all_bits[k:k+8]
|
||||
byte_val = 0
|
||||
for b in byte_bits:
|
||||
byte_val = (byte_val << 1) | b
|
||||
extracted_bytes.append(byte_val)
|
||||
# Покажем как число и как символ
|
||||
if 32 <= byte_val < 127:
|
||||
char = chr(byte_val)
|
||||
else:
|
||||
char = f'[{byte_val:02x}]'
|
||||
print(f"Байт {k//8:2d}: {byte_val:3d} (0x{byte_val:02x}) -> {char} биты={byte_bits}")
|
||||
|
||||
# Найдем стоп-маркер
|
||||
print("\n" + "=" * 60)
|
||||
print("Поиск стоп-маркера:")
|
||||
print("=" * 60)
|
||||
|
||||
stop_counter = 0
|
||||
stop_position = -1
|
||||
for i, bit in enumerate(all_bits):
|
||||
if bit == 0:
|
||||
stop_counter += 1
|
||||
if stop_counter == 8:
|
||||
stop_position = i - 7
|
||||
print(f"Стоп-маркер найден на позиции бита {stop_position}")
|
||||
break
|
||||
else:
|
||||
stop_counter = 0
|
||||
|
||||
if stop_position == -1:
|
||||
print("Стоп-маркер НЕ НАЙДЕН!")
|
||||
# Используем все биты
|
||||
bits_to_decode = all_bits
|
||||
else:
|
||||
bits_to_decode = all_bits[:stop_position]
|
||||
print(f"Битов до стоп-маркера: {len(bits_to_decode)}")
|
||||
|
||||
# Декодируем
|
||||
bits_string = ''.join(str(b) for b in bits_to_decode)
|
||||
try:
|
||||
decoded = bits_to_text(bits_string)
|
||||
print(f"\nДекодированное сообщение: '{decoded}'")
|
||||
if decoded == message:
|
||||
print("✅ УСПЕХ!")
|
||||
else:
|
||||
print("❌ Сообщения не совпадают")
|
||||
except Exception as e:
|
||||
print(f"\nОшибка декодирования: {e}")
|
||||
80
debug_decode.py
Normal file
80
debug_decode.py
Normal file
@@ -0,0 +1,80 @@
|
||||
cat > debug_decode.py << 'EOF'
|
||||
print("Отладка decode_dct")
|
||||
|
||||
from core.dct import _apply_dct, _extract_bits_from_block, MID_FREQ_INDICES
|
||||
from core.utils import bits_to_text
|
||||
from PIL import Image
|
||||
import numpy as np
|
||||
|
||||
# Загружаем закодированное изображение
|
||||
print("Загружаем out_min.png...")
|
||||
img = Image.open("out_min.png").convert('L')
|
||||
pixels = np.array(img, dtype=np.float64)
|
||||
height, width = pixels.shape
|
||||
print(f"Размер: {height}x{width}")
|
||||
|
||||
# Извлекаем все биты
|
||||
print("\nИзвлекаем биты из всех блоков...")
|
||||
all_bits = []
|
||||
block_count = 0
|
||||
|
||||
for i in range(0, height - 8 + 1, 8):
|
||||
for j in range(0, width - 8 + 1, 8):
|
||||
block = pixels[i:i+8, j:j+8]
|
||||
dct_block = _apply_dct(block)
|
||||
bits_from_block = _extract_bits_from_block(dct_block, len(MID_FREQ_INDICES))
|
||||
all_bits.extend(bits_from_block)
|
||||
block_count += 1
|
||||
|
||||
if block_count <= 5:
|
||||
print(f"Блок {block_count}: биты={bits_from_block[:20]}...")
|
||||
|
||||
print(f"\nВсего извлечено битов: {len(all_bits)}")
|
||||
|
||||
# Покажем первые 50 битов
|
||||
print(f"\nПервые 50 битов: {all_bits[:50]}")
|
||||
|
||||
# Поиск стоп-маркера (8 нулей подряд)
|
||||
print("\nПоиск стоп-маркера (8 нулей подряд)...")
|
||||
stop_counter = 0
|
||||
stop_position = -1
|
||||
for i, bit in enumerate(all_bits):
|
||||
if bit == 0:
|
||||
stop_counter += 1
|
||||
if stop_counter == 8:
|
||||
stop_position = i - 7
|
||||
print(f"Стоп-маркер найден на позиции бита {stop_position}")
|
||||
break
|
||||
else:
|
||||
stop_counter = 0
|
||||
|
||||
if stop_position == -1:
|
||||
print("Стоп-маркер НЕ НАЙДЕН!")
|
||||
# Покажем где встречаются нули
|
||||
print("\nПоиск 8 нулей подряд:")
|
||||
for i in range(len(all_bits) - 7):
|
||||
if all_bits[i:i+8] == [0,0,0,0,0,0,0,0]:
|
||||
print(f" Найдено на позиции {i}")
|
||||
break
|
||||
else:
|
||||
print(f"Битов до стоп-маркера: {stop_position}")
|
||||
bits_to_use = all_bits[:stop_position]
|
||||
|
||||
# Преобразуем в байты
|
||||
print("\nБайты до стоп-маркера:")
|
||||
for k in range(0, len(bits_to_use), 8):
|
||||
if k + 8 <= len(bits_to_use):
|
||||
byte_bits = bits_to_use[k:k+8]
|
||||
byte_val = 0
|
||||
for b in byte_bits:
|
||||
byte_val = (byte_val << 1) | b
|
||||
print(f" Байт {k//8}: {byte_val:3d} (0x{byte_val:02x}) биты={byte_bits}")
|
||||
|
||||
# Декодируем
|
||||
bits_string = ''.join(str(b) for b in bits_to_use)
|
||||
try:
|
||||
decoded = bits_to_text(bits_string)
|
||||
print(f"\nДекодировано: '{decoded}'")
|
||||
except Exception as e:
|
||||
print(f"\nОшибка декодирования: {e}")
|
||||
EOF
|
||||
5
main.py
5
main.py
@@ -17,6 +17,7 @@ import argparse
|
||||
import sys
|
||||
import os
|
||||
from core.lsb import encode_lsb, decode_lsb
|
||||
from core.dct import encode_dct, decode_dct
|
||||
from core.utils import calculate_capacity, calculate_psnr
|
||||
|
||||
|
||||
@@ -102,7 +103,7 @@ def encode_command(args) -> None:
|
||||
if args.method == 'lsb':
|
||||
success = encode_lsb(args.image, message, args.output)
|
||||
elif args.method == 'dct':
|
||||
print("Метод DCT пока не реализован")
|
||||
success = encode_dct(args.image, message, args.output)
|
||||
success = False
|
||||
else:
|
||||
print(f"Неизвестный метод: {args.method}")
|
||||
@@ -125,7 +126,7 @@ def decode_command(args) -> None:
|
||||
if args.method == 'lsb':
|
||||
message = decode_lsb(args.image)
|
||||
elif args.method == 'dct':
|
||||
print("Метод DCT пока не реализован")
|
||||
message = decode_dct(args.image)
|
||||
message = None
|
||||
else:
|
||||
print(f"Неизвестный метод: {args.method}")
|
||||
|
||||
13
minimal_test.py
Normal file
13
minimal_test.py
Normal file
@@ -0,0 +1,13 @@
|
||||
print("Скрипт запустился")
|
||||
from core.dct import encode_dct, decode_dct
|
||||
print("Импорт прошел")
|
||||
from PIL import Image
|
||||
print("Создаем изображение")
|
||||
img = Image.new('L', (400, 400), 128)
|
||||
img.save("test_min.png")
|
||||
print("Изображение создано")
|
||||
result = encode_dct("test_min.png", "Hi", "out_min.png")
|
||||
print(f"Результат кодирования: {result}")
|
||||
decoded = decode_dct("out_min.png")
|
||||
print(f"Декодировано: '{decoded}'")
|
||||
print("Готово")
|
||||
Reference in New Issue
Block a user