Project 3: Intelligent File Organizer
Welcome to your third complete Python application! Weβre building an intelligent file organization system that automatically categorizes, sorts, and manages files based on type, content, date, and custom rules.
Project Overview
FileMaster is an automation script that intelligently organizes files by:
- π Automatic categorization - Documents, images, videos, music, archives
- π·οΈ Content analysis - File type detection and metadata extraction
- π Date-based sorting - Creation, modification, and access dates
- π― Custom rules - User-defined organization patterns
- π Duplicate detection - Find and handle duplicate files
- π Organization reports - Statistics and summaries
- π Undo functionality - Safe operations with rollback
- βοΈ Configurable settings - Customizable organization profiles
Learning Objectives
By the end of this project, youβll be able to:
- Work extensively with the file system using Python
- Implement intelligent file categorization algorithms
- Handle file operations safely with error recovery
- Create configuration-driven applications
- Build command-line tools with rich output
- Implement logging and progress tracking
- Design undo/rollback functionality
Project Requirements
Core Features
-
File Analysis
- Detect file types using MIME types and extensions
- Extract metadata (creation date, size, etc.)
- Analyze file content for better categorization
- Handle various file formats and encodings
-
Intelligent Organization
- Automatic folder creation based on categories
- Date-based subfolder organization
- Custom naming patterns and rules
- Conflict resolution for duplicate names
-
Safety Features
- Dry-run mode for testing changes
- Undo functionality to reverse operations
- Backup creation before major changes
- Progress tracking and interruption handling
-
Duplicate Management
- File hash calculation for duplicate detection
- Multiple duplicate handling strategies
- Size and content comparison
- Duplicate reporting and cleanup
Advanced Features
-
Custom Rules Engine
- User-defined organization rules
- Pattern matching and conditions
- Priority-based rule application
- Rule validation and testing
-
Reporting and Analytics
- Organization statistics and summaries
- File type distribution analysis
- Storage usage reports
- Operation logs and history
-
Configuration Management
- Multiple organization profiles
- User preferences and settings
- Rule sets and templates
- Configuration validation
Project Structure
filemaster/
βββ filemaster/
β βββ __init__.py # Package initialization
β βββ organizer.py # Main organizer logic
β βββ analyzer.py # File analysis and categorization
β βββ rules.py # Custom rules engine
β βββ duplicates.py # Duplicate file detection
β βββ utils.py # Helper functions
β βββ config.py # Configuration management
βββ data/
β βββ config.yaml # Default configuration
β βββ rules/ # Custom rule files
β βββ logs/ # Operation logs
βββ tests/
β βββ test_organizer.py
β βββ test_analyzer.py
β βββ test_duplicates.py
βββ scripts/
β βββ organize.py # Main CLI script
β βββ setup.py # Installation script
βββ docs/
β βββ README.md
β βββ rules.md # Rules documentation
β βββ examples.md # Usage examples
βββ requirements.txt
βββ setup.py
Step 1: File Analysis System
Create the core file analysis and categorization system.
# filemaster/analyzer.py
import os
import mimetypes
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import hashlib
from datetime import datetime
import magic # python-magic for better MIME type detection
from PIL import Image # For image metadata
import exifread # For EXIF data
from mutagen import File as AudioFile # For audio metadata
from hachoir.parser import createParser # For video metadata
from hachoir.metadata import extractMetadata
class FileAnalyzer:
"""Analyzes files and extracts metadata for organization."""
# File type categories
CATEGORIES = {
'documents': {
'extensions': ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.odt', '.xls', '.xlsx', '.ppt', '.pptx'],
'mime_types': ['application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument'],
'folder': 'Documents'
},
'images': {
'extensions': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp', '.svg'],
'mime_types': ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/tiff'],
'folder': 'Images'
},
'videos': {
'extensions': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm'],
'mime_types': ['video/mp4', 'video/avi', 'video/x-matroska'],
'folder': 'Videos'
},
'audio': {
'extensions': ['.mp3', '.wav', '.flac', '.aac', '.ogg', '.wma'],
'mime_types': ['audio/mpeg', 'audio/wav', 'audio/flac'],
'folder': 'Audio'
},
'archives': {
'extensions': ['.zip', '.rar', '.7z', '.tar', '.gz', '.bz2'],
'mime_types': ['application/zip', 'application/x-rar-compressed'],
'folder': 'Archives'
},
'code': {
'extensions': ['.py', '.js', '.html', '.css', '.java', '.cpp', '.c', '.php', '.rb', '.go'],
'mime_types': ['text/x-python', 'application/javascript', 'text/html'],
'folder': 'Code'
},
'executables': {
'extensions': ['.exe', '.msi', '.dmg', '.deb', '.rpm', '.app'],
'mime_types': ['application/x-executable', 'application/x-msi'],
'folder': 'Executables'
}
}
def __init__(self):
# Initialize MIME type detection
mimetypes.init()
def analyze_file(self, file_path: str) -> Dict:
"""Analyze a single file and return metadata."""
path = Path(file_path)
if not path.exists() or not path.is_file():
raise FileNotFoundError(f"File not found: {file_path}")
# Basic file information
stat = path.stat()
metadata = {
'path': str(path.absolute()),
'name': path.name,
'stem': path.stem,
'suffix': path.suffix.lower(),
'size': stat.st_size,
'created': datetime.fromtimestamp(stat.st_ctime),
'modified': datetime.fromtimestamp(stat.st_mtime),
'accessed': datetime.fromtimestamp(stat.st_atime),
'category': self._categorize_file(path),
'mime_type': self._get_mime_type(path),
'hash': self._calculate_hash(path),
'is_hidden': path.name.startswith('.'),
'is_system': False # Could be enhanced to detect system files
}
# Add category-specific metadata
metadata.update(self._extract_specialized_metadata(path, metadata['category']))
return metadata
def _categorize_file(self, path: Path) -> str:
"""Categorize file based on extension and MIME type."""
suffix = path.suffix.lower()
mime_type = self._get_mime_type(path)
# Check each category
for category, info in self.CATEGORIES.items():
if suffix in info['extensions']:
return category
if mime_type and any(mt in mime_type for mt in info['mime_types']):
return category
# Default category
return 'other'
def _get_mime_type(self, path: Path) -> Optional[str]:
"""Get MIME type using multiple methods."""
# Try python-magic first (most accurate)
try:
return magic.from_file(str(path), mime=True)
except:
pass
# Fallback to mimetypes
mime_type, _ = mimetypes.guess_type(str(path))
return mime_type
def _calculate_hash(self, path: Path, algorithm: str = 'md5') -> str:
"""Calculate file hash for duplicate detection."""
hash_func = hashlib.new(algorithm)
with open(path, 'rb') as f:
# Read in chunks to handle large files
for chunk in iter(lambda: f.read(4096), b""):
hash_func.update(chunk)
return hash_func.hexdigest()
def _extract_specialized_metadata(self, path: Path, category: str) -> Dict:
"""Extract category-specific metadata."""
metadata = {}
try:
if category == 'images':
metadata.update(self._extract_image_metadata(path))
elif category == 'audio':
metadata.update(self._extract_audio_metadata(path))
elif category == 'videos':
metadata.update(self._extract_video_metadata(path))
elif category == 'documents':
metadata.update(self._extract_document_metadata(path))
except Exception as e:
# Don't fail analysis if metadata extraction fails
metadata['extraction_error'] = str(e)
return metadata
def _extract_image_metadata(self, path: Path) -> Dict:
"""Extract image-specific metadata."""
metadata = {}
try:
# Basic image info with PIL
with Image.open(path) as img:
metadata.update({
'width': img.width,
'height': img.height,
'format': img.format,
'mode': img.mode
})
# EXIF data
with open(path, 'rb') as f:
exif_data = exifread.process_file(f)
if exif_data:
# Extract common EXIF tags
exif_info = {}
for tag, value in exif_data.items():
if tag in ['EXIF DateTimeOriginal', 'EXIF DateTimeDigitized', 'Image DateTime']:
try:
# Parse EXIF date format: '2023:12:25 14:30:00'
date_str = str(value)
exif_info['datetime'] = datetime.strptime(date_str, '%Y:%m:%d %H:%M:%S')
except:
pass
elif tag in ['EXIF Make', 'EXIF Model']:
exif_info[tag.split()[-1].lower()] = str(value)
metadata['exif'] = exif_info
except Exception as e:
metadata['image_error'] = str(e)
return metadata
def _extract_audio_metadata(self, path: Path) -> Dict:
"""Extract audio-specific metadata."""
metadata = {}
try:
audio = AudioFile(path)
if audio:
metadata.update({
'length': audio.info.length if hasattr(audio.info, 'length') else None,
'bitrate': audio.info.bitrate if hasattr(audio.info, 'bitrate') else None,
'sample_rate': audio.info.sample_rate if hasattr(audio.info, 'sample_rate') else None,
'channels': audio.info.channels if hasattr(audio.info, 'channels') else None
})
# Extract tags if available
if hasattr(audio, 'tags'):
tags = {}
for key in ['title', 'artist', 'album', 'year', 'genre']:
if key in audio.tags:
tags[key] = str(audio.tags[key])
if tags:
metadata['tags'] = tags
except Exception as e:
metadata['audio_error'] = str(e)
return metadata
def _extract_video_metadata(self, path: Path) -> Dict:
"""Extract video-specific metadata."""
metadata = {}
try:
parser = createParser(str(path))
if parser:
with parser:
extractor = extractMetadata(parser)
if extractor:
metadata.update({
'duration': extractor.get('duration'),
'width': extractor.get('width'),
'height': extractor.get('height'),
'frame_rate': extractor.get('frame_rate'),
'bit_rate': extractor.get('bit_rate'),
'format': extractor.get('format')
})
except Exception as e:
metadata['video_error'] = str(e)
return metadata
def _extract_document_metadata(self, path: Path) -> Dict:
"""Extract document-specific metadata."""
metadata = {}
# For now, just detect if it's text-based
mime_type = self._get_mime_type(path)
if mime_type and mime_type.startswith('text/'):
metadata['is_text'] = True
# Try to detect encoding and read first few lines
try:
with open(path, 'r', encoding='utf-8') as f:
first_lines = []
for i, line in enumerate(f):
if i >= 5: # First 5 lines
break
first_lines.append(line.strip())
metadata['preview'] = first_lines
except UnicodeDecodeError:
metadata['encoding'] = 'binary'
return metadata
def get_category_folder(self, category: str) -> str:
"""Get the folder name for a category."""
return self.CATEGORIES.get(category, {}).get('folder', 'Other')
def is_organizable_file(self, path: Path) -> bool:
"""Check if a file should be organized."""
# Skip hidden files, system files, and very small files
if path.name.startswith('.') or path.stat().st_size < 100:
return False
return True
Step 2: Rules Engine
Create a flexible rules engine for custom organization logic.
# filemaster/rules.py
import re
from typing import Dict, List, Callable, Any, Optional
from pathlib import Path
from datetime import datetime
import yaml
class OrganizationRule:
"""Represents a single organization rule."""
def __init__(self, name: str, conditions: List[Dict], actions: List[Dict], priority: int = 0):
self.name = name
self.conditions = conditions
self.actions = actions
self.priority = priority
def evaluate(self, file_metadata: Dict) -> bool:
"""Evaluate if this rule applies to a file."""
for condition in self.conditions:
if not self._check_condition(condition, file_metadata):
return False
return True
def _check_condition(self, condition: Dict, metadata: Dict) -> bool:
"""Check a single condition."""
field = condition.get('field')
operator = condition.get('operator', 'equals')
value = condition.get('value')
if field not in metadata:
return False
actual_value = metadata[field]
# Handle different operators
if operator == 'equals':
return actual_value == value
elif operator == 'not_equals':
return actual_value != value
elif operator == 'contains':
return value in str(actual_value)
elif operator == 'not_contains':
return value not in str(actual_value)
elif operator == 'matches_regex':
return bool(re.match(value, str(actual_value)))
elif operator == 'greater_than':
return actual_value > value
elif operator == 'less_than':
return actual_value < value
elif operator == 'in_list':
return actual_value in value
elif operator == 'not_in_list':
return actual_value not in value
return False
def apply_actions(self, file_metadata: Dict, base_path: Path) -> Dict:
"""Apply rule actions to determine new path."""
new_path = base_path
for action in self.actions:
action_type = action.get('type')
if action_type == 'set_folder':
folder = action.get('folder', '')
# Replace variables in folder name
folder = self._replace_variables(folder, file_metadata)
new_path = new_path / folder
elif action_type == 'set_filename':
filename = action.get('filename', '')
filename = self._replace_variables(filename, file_metadata)
new_path = new_path.parent / filename
elif action_type == 'add_date_folder':
date_field = action.get('date_field', 'created')
format_str = action.get('format', '%Y-%m-%d')
if date_field in file_metadata:
date_obj = file_metadata[date_field]
if isinstance(date_obj, datetime):
folder_name = date_obj.strftime(format_str)
new_path = new_path / folder_name
return {
'new_path': new_path,
'rule_applied': self.name
}
def _replace_variables(self, template: str, metadata: Dict) -> str:
"""Replace variables in template strings."""
result = template
# Replace {field_name} with metadata values
for key, value in metadata.items():
if isinstance(value, datetime):
# Format dates
result = result.replace(f'{{{key}}}', value.strftime('%Y-%m-%d_%H-%M-%S'))
result = result.replace(f'{{{key}:date}}', value.strftime('%Y-%m-%d'))
result = result.replace(f'{{{key}:year}}', value.strftime('%Y'))
result = result.replace(f'{{{key}:month}}', value.strftime('%m'))
else:
result = result.replace(f'{{{key}}}', str(value))
return result
class RulesEngine:
"""Manages and applies organization rules."""
def __init__(self):
self.rules: List[OrganizationRule] = []
def add_rule(self, rule: OrganizationRule):
"""Add a rule to the engine."""
self.rules.append(rule)
# Sort by priority (higher priority first)
self.rules.sort(key=lambda r: r.priority, reverse=True)
def load_rules_from_file(self, file_path: str):
"""Load rules from a YAML file."""
with open(file_path, 'r') as f:
rules_data = yaml.safe_load(f)
for rule_data in rules_data.get('rules', []):
rule = OrganizationRule(
name=rule_data['name'],
conditions=rule_data.get('conditions', []),
actions=rule_data.get('actions', []),
priority=rule_data.get('priority', 0)
)
self.add_rule(rule)
def apply_rules(self, file_metadata: Dict, base_path: Path) -> Optional[Dict]:
"""Apply the first matching rule to a file."""
for rule in self.rules:
if rule.evaluate(file_metadata):
return rule.apply_actions(file_metadata, base_path)
return None
def get_matching_rules(self, file_metadata: Dict) -> List[OrganizationRule]:
"""Get all rules that match a file."""
return [rule for rule in self.rules if rule.evaluate(file_metadata)]
def validate_rule(self, rule_data: Dict) -> List[str]:
"""Validate a rule configuration."""
errors = []
if 'name' not in rule_data:
errors.append("Rule must have a 'name' field")
if 'conditions' not in rule_data:
errors.append("Rule must have 'conditions' field")
elif not isinstance(rule_data['conditions'], list):
errors.append("'conditions' must be a list")
if 'actions' not in rule_data:
errors.append("Rule must have 'actions' field")
elif not isinstance(rule_data['actions'], list):
errors.append("'actions' must be a list")
# Validate conditions
for i, condition in enumerate(rule_data.get('conditions', [])):
if 'field' not in condition:
errors.append(f"Condition {i} must have a 'field'")
if 'operator' not in condition:
errors.append(f"Condition {i} must have an 'operator'")
# Validate actions
for i, action in enumerate(rule_data.get('actions', [])):
if 'type' not in action:
errors.append(f"Action {i} must have a 'type'")
return errors
# Default rules
DEFAULT_RULES = [
{
'name': 'Camera Photos',
'priority': 10,
'conditions': [
{'field': 'category', 'operator': 'equals', 'value': 'images'},
{'field': 'exif', 'operator': 'not_equals', 'value': None}
],
'actions': [
{'type': 'set_folder', 'folder': 'Photos/{exif[datetime]:date}'},
{'type': 'set_filename', 'filename': 'IMG_{exif[datetime]:date}_{name}'}
]
},
{
'name': 'Screenshots',
'priority': 8,
'conditions': [
{'field': 'category', 'operator': 'equals', 'value': 'images'},
{'field': 'name', 'operator': 'matches_regex', 'value': r'.*(screenshot|screen.*shot).*'}
],
'actions': [
{'type': 'set_folder', 'folder': 'Screenshots'},
{'type': 'add_date_folder'}
]
},
{
'name': 'Music by Artist',
'priority': 7,
'conditions': [
{'field': 'category', 'operator': 'equals', 'value': 'audio'},
{'field': 'tags', 'operator': 'not_equals', 'value': None}
],
'actions': [
{'type': 'set_folder', 'folder': 'Music/{tags[artist]}/{tags[album]}'}
]
},
{
'name': 'Documents by Date',
'priority': 5,
'conditions': [
{'field': 'category', 'operator': 'equals', 'value': 'documents'}
],
'actions': [
{'type': 'set_folder', 'folder': 'Documents/{created:year}/{created:month}'}
]
}
]
Step 3: Duplicate Detection
Implement duplicate file detection and handling.
# filemaster/duplicates.py
import os
import hashlib
from pathlib import Path
from typing import Dict, List, Set, Tuple, Optional
from collections import defaultdict
import filecmp
from dataclasses import dataclass
@dataclass
class DuplicateGroup:
"""Represents a group of duplicate files."""
hash_value: str
files: List[Path]
total_size: int
def __post_init__(self):
self.total_size = sum(os.path.getsize(f) for f in self.files if f.exists())
class DuplicateDetector:
"""Detects and manages duplicate files."""
def __init__(self):
self.hash_cache: Dict[str, List[Path]] = defaultdict(list)
def scan_directory(self, directory: Path, recursive: bool = True) -> Dict[str, DuplicateGroup]:
"""Scan directory for duplicate files."""
if not directory.exists() or not directory.is_dir():
raise ValueError(f"Invalid directory: {directory}")
# Clear previous results
self.hash_cache.clear()
# Scan files
pattern = "**/*" if recursive else "*"
for file_path in directory.glob(pattern):
if file_path.is_file():
try:
file_hash = self._calculate_file_hash(file_path)
self.hash_cache[file_hash].append(file_path)
except (OSError, IOError) as e:
# Skip files that can't be read
continue
# Create duplicate groups
duplicates = {}
for hash_value, files in self.hash_cache.items():
if len(files) > 1:
duplicates[hash_value] = DuplicateGroup(hash_value, files, 0)
return duplicates
def _calculate_file_hash(self, file_path: Path, algorithm: str = 'md5', chunk_size: int = 8192) -> str:
"""Calculate hash of file content."""
hash_func = hashlib.new(algorithm)
try:
with open(file_path, 'rb') as f:
while chunk := f.read(chunk_size):
hash_func.update(chunk)
except (OSError, IOError):
# Return a special hash for unreadable files
return f"unreadable_{file_path.stat().st_size}_{file_path.stat().st_mtime}"
return hash_func.hexdigest()
def find_duplicates_by_size(self, directory: Path, recursive: bool = True) -> Dict[int, List[Path]]:
"""Find potential duplicates by file size first (faster)."""
size_groups: Dict[int, List[Path]] = defaultdict(list)
pattern = "**/*" if recursive else "*"
for file_path in directory.glob(pattern):
if file_path.is_file():
try:
size = file_path.stat().st_size
if size > 0: # Skip empty files
size_groups[size].append(file_path)
except OSError:
continue
# Filter to groups with multiple files
return {size: files for size, files in size_groups.items() if len(files) > 1}
def verify_duplicates(self, files: List[Path]) -> List[Path]:
"""Verify which files in a list are actually duplicates."""
if len(files) < 2:
return []
# Use the first file as reference
reference = files[0]
duplicates = [reference]
for file_path in files[1:]:
try:
if filecmp.cmp(reference, file_path, shallow=False):
duplicates.append(file_path)
except OSError:
continue
return duplicates if len(duplicates) > 1 else []
def get_duplicate_stats(self, duplicates: Dict[str, DuplicateGroup]) -> Dict:
"""Get statistics about duplicates."""
total_groups = len(duplicates)
total_files = sum(len(group.files) for group in duplicates.values())
total_wasted_space = sum(group.total_size * (len(group.files) - 1) for group in duplicates.values())
return {
'total_duplicate_groups': total_groups,
'total_duplicate_files': total_files,
'total_wasted_space': total_wasted_space,
'average_files_per_group': total_files / total_groups if total_groups > 0 else 0
}
def suggest_cleanup_actions(self, duplicates: Dict[str, DuplicateGroup]) -> List[Dict]:
"""Suggest actions for cleaning up duplicates."""
suggestions = []
for hash_value, group in duplicates.items():
if len(group.files) < 2:
continue
# Sort by modification time (keep newest)
sorted_files = sorted(group.files, key=lambda f: f.stat().st_mtime, reverse=True)
suggestions.append({
'hash': hash_value,
'files': [str(f) for f in sorted_files],
'keep': str(sorted_files[0]),
'delete': [str(f) for f in sorted_files[1:]],
'wasted_space': group.total_size * (len(group.files) - 1),
'reason': 'Keep newest file, delete older duplicates'
})
return suggestions
class DuplicateHandler:
"""Handles duplicate file operations."""
def __init__(self, backup_dir: Optional[Path] = None):
self.backup_dir = backup_dir or Path.home() / '.filemaster' / 'backups'
self.backup_dir.mkdir(parents=True, exist_ok=True)
def remove_duplicates(self, duplicates: List[Dict], dry_run: bool = True) -> Dict:
"""Remove duplicate files based on suggestions."""
results = {
'processed': 0,
'deleted': 0,
'backed_up': 0,
'errors': 0,
'total_space_saved': 0
}
for dup_info in duplicates:
keep_file = Path(dup_info['keep'])
delete_files = [Path(f) for f in dup_info['delete']]
for file_path in delete_files:
try:
if not file_path.exists():
continue
results['processed'] += 1
if not dry_run:
# Create backup
backup_path = self._create_backup(file_path)
if backup_path:
results['backed_up'] += 1
# Delete file
file_path.unlink()
results['deleted'] += 1
results['total_space_saved'] += file_path.stat().st_size
except Exception as e:
results['errors'] += 1
print(f"Error processing {file_path}: {e}")
return results
def _create_backup(self, file_path: Path) -> Optional[Path]:
"""Create a backup of a file before deletion."""
try:
backup_name = f"{file_path.name}.backup_{int(file_path.stat().st_mtime)}"
backup_path = self.backup_dir / backup_name
# Handle name conflicts
counter = 1
while backup_path.exists():
stem = file_path.stem
suffix = file_path.suffix
backup_name = f"{stem}.backup_{counter}_{int(file_path.stat().st_mtime)}{suffix}"
backup_path = self.backup_dir / backup_name
counter += 1
import shutil
shutil.copy2(file_path, backup_path)
return backup_path
except Exception as e:
print(f"Failed to create backup for {file_path}: {e}")
return None
def restore_from_backup(self, backup_file: Path, target_dir: Path) -> bool:
"""Restore a file from backup."""
try:
import shutil
# Extract original filename from backup name
backup_name = backup_file.name
if '.backup_' in backup_name:
# Remove backup suffix to get original name
original_name = backup_name.split('.backup_')[0] + backup_file.suffix
target_path = target_dir / original_name
shutil.copy2(backup_file, target_path)
return True
else:
return False
except Exception as e:
print(f"Failed to restore {backup_file}: {e}")
return False
Step 4: Main Organizer
Create the core file organization logic.
# filemaster/organizer.py
import os
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Callable
from datetime import datetime
import logging
from collections import defaultdict, Counter
import json
from .analyzer import FileAnalyzer
from .rules import RulesEngine, OrganizationRule
from .duplicates import DuplicateDetector, DuplicateHandler
class FileOrganizer:
"""Main file organization engine."""
def __init__(self, config: Optional[Dict] = None):
self.config = config or self._get_default_config()
self.analyzer = FileAnalyzer()
self.rules_engine = RulesEngine()
self.duplicate_detector = DuplicateDetector()
self.duplicate_handler = DuplicateHandler()
# Setup logging
self._setup_logging()
# Load default rules
self._load_default_rules()
def _get_default_config(self) -> Dict:
"""Get default configuration."""
return {
'dry_run': True,
'recursive': True,
'follow_symlinks': False,
'backup_before_move': True,
'conflict_resolution': 'rename', # rename, skip, overwrite
'date_format': '%Y-%m-%d',
'log_level': 'INFO',
'exclude_patterns': ['.*', '*.tmp', '*.bak'],
'include_hidden': False
}
def _setup_logging(self):
"""Setup logging configuration."""
log_level = getattr(logging, self.config.get('log_level', 'INFO').upper())
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('filemaster.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('FileOrganizer')
def _load_default_rules(self):
"""Load default organization rules."""
from .rules import DEFAULT_RULES
for rule_data in DEFAULT_RULES:
rule = OrganizationRule(
name=rule_data['name'],
conditions=rule_data['conditions'],
actions=rule_data['actions'],
priority=rule_data['priority']
)
self.rules_engine.add_rule(rule)
def organize_directory(self, source_dir: str, target_dir: Optional[str] = None,
progress_callback: Optional[Callable] = None) -> Dict:
"""Organize files in a directory."""
source_path = Path(source_dir)
target_path = Path(target_dir) if target_dir else source_path / 'Organized'
if not source_path.exists() or not source_path.is_dir():
raise ValueError(f"Source directory does not exist: {source_dir}")
self.logger.info(f"Starting organization of {source_path} to {target_path}")
# Scan files
files_to_process = self._scan_files(source_path)
self.logger.info(f"Found {len(files_to_process)} files to process")
# Process files
results = {
'total_files': len(files_to_process),
'processed': 0,
'moved': 0,
'skipped': 0,
'errors': 0,
'categories': defaultdict(int),
'operations': []
}
for i, file_path in enumerate(files_to_process):
try:
operation = self._process_file(file_path, target_path)
results['operations'].append(operation)
results['processed'] += 1
if operation['action'] == 'moved':
results['moved'] += 1
results['categories'][operation.get('category', 'other')] += 1
elif operation['action'] == 'skipped':
results['skipped'] += 1
if progress_callback:
progress_callback(i + 1, len(files_to_process), operation)
except Exception as e:
self.logger.error(f"Error processing {file_path}: {e}")
results['errors'] += 1
results['operations'].append({
'file': str(file_path),
'action': 'error',
'error': str(e)
})
# Generate summary
results['summary'] = self._generate_summary(results)
self.logger.info(f"Organization complete: {results['moved']} files moved, {results['errors']} errors")
return results
def _scan_files(self, directory: Path) -> List[Path]:
"""Scan directory for files to process."""
files = []
pattern = "**/*" if self.config['recursive'] else "*"
for path in directory.glob(pattern):
if path.is_file() and self._should_process_file(path):
files.append(path)
return files
def _should_process_file(self, file_path: Path) -> bool:
"""Check if a file should be processed."""
# Skip hidden files unless configured to include them
if not self.config['include_hidden'] and file_path.name.startswith('.'):
return False
# Check exclude patterns
for pattern in self.config['exclude_patterns']:
if file_path.match(pattern):
return False
# Check if file is organizable
return self.analyzer.is_organizable_file(file_path)
def _process_file(self, file_path: Path, target_base: Path) -> Dict:
"""Process a single file."""
# Analyze file
metadata = self.analyzer.analyze_file(str(file_path))
category = metadata['category']
# Apply rules
rule_result = self.rules_engine.apply_rules(metadata, target_base)
if rule_result:
# Custom rule applied
new_path = rule_result['new_path']
rule_name = rule_result['rule_applied']
else:
# Default organization
category_folder = self.analyzer.get_category_folder(category)
new_path = target_base / category_folder / file_path.name
# Ensure target directory exists
new_path.parent.mkdir(parents=True, exist_ok=True)
# Handle conflicts
final_path = self._resolve_conflicts(file_path, new_path)
operation = {
'file': str(file_path),
'target': str(final_path),
'category': category,
'rule': rule_result['rule_applied'] if rule_result else None,
'action': 'skipped'
}
# Perform move operation
if not self.config['dry_run']:
if self._move_file(file_path, final_path):
operation['action'] = 'moved'
else:
operation['action'] = 'error'
else:
operation['action'] = 'would_move'
return operation
def _resolve_conflicts(self, source_path: Path, target_path: Path) -> Path:
"""Resolve filename conflicts."""
if not target_path.exists():
return target_path
strategy = self.config['conflict_resolution']
if strategy == 'skip':
return source_path # Don't move
elif strategy == 'overwrite':
return target_path
elif strategy == 'rename':
# Add number suffix
stem = target_path.stem
suffix = target_path.suffix
parent = target_path.parent
counter = 1
while True:
new_name = f"{stem}_{counter}{suffix}"
new_path = parent / new_name
if not new_path.exists():
return new_path
counter += 1
return target_path
def _move_file(self, source: Path, target: Path) -> bool:
"""Move a file with error handling."""
try:
if self.config['backup_before_move']:
# Create backup (simple copy for now)
backup_path = source.parent / f"{source.name}.backup"
shutil.copy2(source, backup_path)
shutil.move(str(source), str(target))
return True
except Exception as e:
self.logger.error(f"Failed to move {source} to {target}: {e}")
return False
def _generate_summary(self, results: Dict) -> Dict:
"""Generate organization summary."""
return {
'completion_time': datetime.now().isoformat(),
'success_rate': (results['moved'] / results['total_files'] * 100) if results['total_files'] > 0 else 0,
'most_common_category': max(results['categories'].items(), key=lambda x: x[1]) if results['categories'] else None,
'total_size_processed': sum(op.get('size', 0) for op in results['operations'] if 'size' in op)
}
def find_duplicates(self, directory: str) -> Dict:
"""Find duplicate files in directory."""
dir_path = Path(directory)
duplicates = self.duplicate_detector.scan_directory(dir_path, self.config['recursive'])
stats = self.duplicate_detector.get_duplicate_stats(duplicates)
return {
'duplicates': {k: {'files': [str(f) for f in v.files], 'total_size': v.total_size}
for k, v in duplicates.items()},
'stats': stats,
'suggestions': self.duplicate_detector.suggest_cleanup_actions(duplicates)
}
def cleanup_duplicates(self, duplicate_suggestions: List[Dict], dry_run: bool = True) -> Dict:
"""Clean up duplicate files."""
self.duplicate_handler = DuplicateHandler()
return self.duplicate_handler.remove_duplicates(duplicate_suggestions, dry_run)
def undo_last_operation(self) -> bool:
"""Undo the last organization operation."""
# This would require storing operation history
# Implementation would depend on how operations are tracked
self.logger.warning("Undo functionality not yet implemented")
return False
def export_report(self, results: Dict, output_file: str):
"""Export organization results to file."""
with open(output_file, 'w') as f:
json.dump(results, f, indent=2, default=str)
self.logger.info(f"Report exported to {output_file}")
Step 5: Command-Line Interface
Create the CLI for user interaction.
# scripts/organize.py
#!/usr/bin/env python3
"""
FileMaster - Intelligent File Organizer
Command-line interface for organizing files automatically.
"""
import argparse
import sys
import json
from pathlib import Path
from typing import Optional
import time
from filemaster.organizer import FileOrganizer
from filemaster.config import load_config
def create_parser() -> argparse.ArgumentParser:
"""Create command-line argument parser."""
parser = argparse.ArgumentParser(
prog='filemaster',
description='Intelligent File Organizer - Automatically organize your files'
)
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Organize command
organize_parser = subparsers.add_parser('organize', help='Organize files in directory')
organize_parser.add_argument('source', help='Source directory to organize')
organize_parser.add_argument('-t', '--target', help='Target directory (default: source/Organized)')
organize_parser.add_argument('--dry-run', action='store_true', help='Show what would be done without doing it')
organize_parser.add_argument('--no-recursive', action='store_true', help='Do not scan subdirectories')
organize_parser.add_argument('--no-backup', action='store_true', help='Do not create backups before moving')
organize_parser.add_argument('-c', '--config', help='Configuration file path')
# Duplicates command
dup_parser = subparsers.add_parser('duplicates', help='Find duplicate files')
dup_parser.add_argument('directory', help='Directory to scan for duplicates')
dup_parser.add_argument('--no-recursive', action='store_true', help='Do not scan subdirectories')
dup_parser.add_argument('--cleanup', action='store_true', help='Automatically clean up duplicates')
dup_parser.add_argument('--dry-run', action='store_true', help='Show cleanup actions without executing')
# Rules command
rules_parser = subparsers.add_parser('rules', help='Manage organization rules')
rules_parser.add_argument('--list', action='store_true', help='List current rules')
rules_parser.add_argument('--add', help='Add rule from file')
rules_parser.add_argument('--validate', help='Validate rule file')
# Report command
report_parser = subparsers.add_parser('report', help='Generate organization report')
report_parser.add_argument('results_file', help='JSON results file from previous organization')
report_parser.add_argument('-o', '--output', help='Output file (default: report.txt)')
return parser
def progress_callback(current: int, total: int, operation: dict):
"""Progress callback for organization operations."""
percent = (current / total) * 100
action = operation.get('action', 'processing')
file_name = Path(operation['file']).name
print(f"\r[{percent:5.1f}%] {action}: {file_name}", end='', flush=True)
if current == total:
print() # New line at end
def main():
"""Main CLI entry point."""
parser = create_parser()
args = parser.parse_args()
if not args.command:
parser.print_help()
return
try:
if args.command == 'organize':
handle_organize(args)
elif args.command == 'duplicates':
handle_duplicates(args)
elif args.command == 'rules':
handle_rules(args)
elif args.command == 'report':
handle_report(args)
else:
print(f"Unknown command: {args.command}")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
def handle_organize(args):
"""Handle organize command."""
# Load configuration
config = load_config(args.config) if args.config else {}
# Override config with command-line args
config.update({
'dry_run': args.dry_run,
'recursive': not args.no_recursive,
'backup_before_move': not args.no_backup
})
# Create organizer
organizer = FileOrganizer(config)
print(f"FileMaster - Organizing files in: {args.source}")
if config['dry_run']:
print("DRY RUN MODE - No files will be moved")
print("-" * 50)
start_time = time.time()
# Organize files
results = organizer.organize_directory(
args.source,
args.target,
progress_callback=progress_callback
)
end_time = time.time()
# Print summary
print("\n" + "=" * 50)
print("ORGANIZATION COMPLETE")
print("=" * 50)
print(f"Total files processed: {results['total_files']}")
print(f"Files moved: {results['moved']}")
print(f"Files skipped: {results['skipped']}")
print(f"Errors: {results['errors']}")
print(".1f" print(".1f"
if results['categories']:
print("\nFiles by category:")
for category, count in sorted(results['categories'].items()):
print(f" {category:12s}: {count}")
# Save detailed results
results_file = f"organization_results_{int(time.time())}.json"
with open(results_file, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"\nDetailed results saved to: {results_file}")
def handle_duplicates(args):
"""Handle duplicates command."""
organizer = FileOrganizer()
print(f"Scanning for duplicates in: {args.directory}")
print("-" * 50)
# Find duplicates
results = organizer.find_duplicates(args.directory)
stats = results['stats']
print("DUPLICATE SCAN RESULTS")
print("=" * 30)
print(f"Duplicate groups found: {stats['total_duplicate_groups']}")
print(f"Total duplicate files: {stats['total_duplicate_files']}")
print(".1f" print(".1f"
if results['duplicates']:
print("\nDuplicate groups:")
for i, (hash_val, group) in enumerate(list(results['duplicates'].items())[:5], 1):
print(f"\nGroup {i} ({len(group['files'])} files, {group['total_size']} bytes each):")
for file_path in group['files'][:3]: # Show first 3 files
print(f" {file_path}")
if len(group['files']) > 3:
print(f" ... and {len(group['files']) - 3} more")
if args.cleanup and results['suggestions']:
print(f"\nCleaning up {len(results['suggestions'])} duplicate groups...")
cleanup_results = organizer.cleanup_duplicates(
results['suggestions'],
dry_run=args.dry_run
)
print("CLEANUP RESULTS")
print("=" * 20)
print(f"Files processed: {cleanup_results['processed']}")
print(f"Files deleted: {cleanup_results['deleted']}")
print(f"Files backed up: {cleanup_results['backed_up']}")
print(f"Errors: {cleanup_results['errors']}")
print(".1f"
if args.dry_run:
print("\nDRY RUN - No files were actually deleted")
def handle_rules(args):
"""Handle rules command."""
organizer = FileOrganizer()
if args.list:
print("CURRENT ORGANIZATION RULES")
print("=" * 30)
for rule in organizer.rules_engine.rules:
print(f"β’ {rule.name} (priority: {rule.priority})")
print(f" Conditions: {len(rule.conditions)}")
print(f" Actions: {len(rule.actions)}")
elif args.validate:
print(f"Validating rule file: {args.validate}")
# Implementation would validate rule file
print("Rule validation not yet implemented")
elif args.add:
print(f"Adding rules from: {args.add}")
# Implementation would add rules from file
print("Rule addition not yet implemented")
def handle_report(args):
"""Handle report command."""
print(f"Generating report from: {args.results_file}")
try:
with open(args.results_file, 'r') as f:
results = json.load(f)
output_file = args.output or 'organization_report.txt'
with open(output_file, 'w') as f:
f.write("FILEMASTER ORGANIZATION REPORT\n")
f.write("=" * 40 + "\n\n")
# Summary
summary = results.get('summary', {})
f.write(f"Completion Time: {summary.get('completion_time', 'Unknown')}\n")
f.write(".1f" f.write(f"Most Common Category: {summary.get('most_common_category', 'None')}\n\n")
# Statistics
f.write("STATISTICS\n")
f.write("-" * 20 + "\n")
f.write(f"Total Files: {results.get('total_files', 0)}\n")
f.write(f"Processed: {results.get('processed', 0)}\n")
f.write(f"Moved: {results.get('moved', 0)}\n")
f.write(f"Skipped: {results.get('skipped', 0)}\n")
f.write(f"Errors: {results.get('errors', 0)}\n\n")
# Categories
if results.get('categories'):
f.write("FILES BY CATEGORY\n")
f.write("-" * 20 + "\n")
for category, count in sorted(results['categories'].items()):
f.write(f"{category:15s}: {count}\n")
print(f"Report generated: {output_file}")
except Exception as e:
print(f"Error generating report: {e}")
if __name__ == "__main__":
main()
Summary
FileMaster demonstrates advanced Python development:
Core Technologies:
- File system operations with pathlib
- Metadata extraction from various file types
- Rule-based organization engine
- Duplicate detection algorithms
- Command-line interface with rich output
Key Skills:
- Complex class hierarchies and design patterns
- Error handling for file operations
- Configuration management
- Progress tracking and user feedback
- Algorithm design for categorization
Advanced Features:
- Custom rules engine with YAML configuration
- Duplicate file detection with hash comparison
- Backup and recovery mechanisms
- Comprehensive logging and reporting
- Dry-run mode for safe testing
Production Features:
- Modular architecture for extensibility
- Comprehensive error handling
- Progress callbacks for long operations
- Configuration file support
- Cross-platform file operations
Next Steps:
- Implement the CLI and test basic functionality
- Add comprehensive file type detection
- Create the rules engine with custom patterns
- Implement duplicate detection and cleanup
- Add configuration file support
- Create comprehensive tests
- Package for distribution
Congratulations! Youβve built an intelligent file organization system! π
Ready for the final project? Letβs create a Personal Finance Tracker! π°