Daily Tech Brief

Top startup stories in your inbox

Subscribe Free

Β© 2026 rakrisi Daily

Project 3 - Intelligent File Organizer

Project 3: Intelligent File Organizer

Welcome to your third complete Python application! We’re building an intelligent file organization system that automatically categorizes, sorts, and manages files based on type, content, date, and custom rules.

Project Overview

FileMaster is an automation script that intelligently organizes files by:

  • πŸ“ Automatic categorization - Documents, images, videos, music, archives
  • 🏷️ Content analysis - File type detection and metadata extraction
  • πŸ“… Date-based sorting - Creation, modification, and access dates
  • 🎯 Custom rules - User-defined organization patterns
  • πŸ” Duplicate detection - Find and handle duplicate files
  • πŸ“Š Organization reports - Statistics and summaries
  • πŸ”„ Undo functionality - Safe operations with rollback
  • βš™οΈ Configurable settings - Customizable organization profiles

Learning Objectives

By the end of this project, you’ll be able to:

  • Work extensively with the file system using Python
  • Implement intelligent file categorization algorithms
  • Handle file operations safely with error recovery
  • Create configuration-driven applications
  • Build command-line tools with rich output
  • Implement logging and progress tracking
  • Design undo/rollback functionality

Project Requirements

Core Features

  1. File Analysis

    • Detect file types using MIME types and extensions
    • Extract metadata (creation date, size, etc.)
    • Analyze file content for better categorization
    • Handle various file formats and encodings
  2. Intelligent Organization

    • Automatic folder creation based on categories
    • Date-based subfolder organization
    • Custom naming patterns and rules
    • Conflict resolution for duplicate names
  3. Safety Features

    • Dry-run mode for testing changes
    • Undo functionality to reverse operations
    • Backup creation before major changes
    • Progress tracking and interruption handling
  4. Duplicate Management

    • File hash calculation for duplicate detection
    • Multiple duplicate handling strategies
    • Size and content comparison
    • Duplicate reporting and cleanup

Advanced Features

  1. Custom Rules Engine

    • User-defined organization rules
    • Pattern matching and conditions
    • Priority-based rule application
    • Rule validation and testing
  2. Reporting and Analytics

    • Organization statistics and summaries
    • File type distribution analysis
    • Storage usage reports
    • Operation logs and history
  3. Configuration Management

    • Multiple organization profiles
    • User preferences and settings
    • Rule sets and templates
    • Configuration validation

Project Structure

filemaster/
β”œβ”€β”€ filemaster/
β”‚   β”œβ”€β”€ __init__.py          # Package initialization
β”‚   β”œβ”€β”€ organizer.py         # Main organizer logic
β”‚   β”œβ”€β”€ analyzer.py          # File analysis and categorization
β”‚   β”œβ”€β”€ rules.py             # Custom rules engine
β”‚   β”œβ”€β”€ duplicates.py        # Duplicate file detection
β”‚   β”œβ”€β”€ utils.py             # Helper functions
β”‚   └── config.py            # Configuration management
β”œβ”€β”€ data/
β”‚   β”œβ”€β”€ config.yaml          # Default configuration
β”‚   β”œβ”€β”€ rules/               # Custom rule files
β”‚   └── logs/                # Operation logs
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ test_organizer.py
β”‚   β”œβ”€β”€ test_analyzer.py
β”‚   └── test_duplicates.py
β”œβ”€β”€ scripts/
β”‚   β”œβ”€β”€ organize.py          # Main CLI script
β”‚   └── setup.py             # Installation script
β”œβ”€β”€ docs/
β”‚   β”œβ”€β”€ README.md
β”‚   β”œβ”€β”€ rules.md             # Rules documentation
β”‚   └── examples.md          # Usage examples
β”œβ”€β”€ requirements.txt
└── setup.py

Step 1: File Analysis System

Create the core file analysis and categorization system.

# filemaster/analyzer.py
import os
import mimetypes
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import hashlib
from datetime import datetime
import magic  # python-magic for better MIME type detection
from PIL import Image  # For image metadata
import exifread  # For EXIF data
from mutagen import File as AudioFile  # For audio metadata
from hachoir.parser import createParser  # For video metadata
from hachoir.metadata import extractMetadata

class FileAnalyzer:
    """Analyzes files and extracts metadata for organization."""
    
    # File type categories
    CATEGORIES = {
        'documents': {
            'extensions': ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.odt', '.xls', '.xlsx', '.ppt', '.pptx'],
            'mime_types': ['application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument'],
            'folder': 'Documents'
        },
        'images': {
            'extensions': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp', '.svg'],
            'mime_types': ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/tiff'],
            'folder': 'Images'
        },
        'videos': {
            'extensions': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm'],
            'mime_types': ['video/mp4', 'video/avi', 'video/x-matroska'],
            'folder': 'Videos'
        },
        'audio': {
            'extensions': ['.mp3', '.wav', '.flac', '.aac', '.ogg', '.wma'],
            'mime_types': ['audio/mpeg', 'audio/wav', 'audio/flac'],
            'folder': 'Audio'
        },
        'archives': {
            'extensions': ['.zip', '.rar', '.7z', '.tar', '.gz', '.bz2'],
            'mime_types': ['application/zip', 'application/x-rar-compressed'],
            'folder': 'Archives'
        },
        'code': {
            'extensions': ['.py', '.js', '.html', '.css', '.java', '.cpp', '.c', '.php', '.rb', '.go'],
            'mime_types': ['text/x-python', 'application/javascript', 'text/html'],
            'folder': 'Code'
        },
        'executables': {
            'extensions': ['.exe', '.msi', '.dmg', '.deb', '.rpm', '.app'],
            'mime_types': ['application/x-executable', 'application/x-msi'],
            'folder': 'Executables'
        }
    }
    
    def __init__(self):
        # Initialize MIME type detection
        mimetypes.init()
    
    def analyze_file(self, file_path: str) -> Dict:
        """Analyze a single file and return metadata."""
        path = Path(file_path)
        
        if not path.exists() or not path.is_file():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        # Basic file information
        stat = path.stat()
        metadata = {
            'path': str(path.absolute()),
            'name': path.name,
            'stem': path.stem,
            'suffix': path.suffix.lower(),
            'size': stat.st_size,
            'created': datetime.fromtimestamp(stat.st_ctime),
            'modified': datetime.fromtimestamp(stat.st_mtime),
            'accessed': datetime.fromtimestamp(stat.st_atime),
            'category': self._categorize_file(path),
            'mime_type': self._get_mime_type(path),
            'hash': self._calculate_hash(path),
            'is_hidden': path.name.startswith('.'),
            'is_system': False  # Could be enhanced to detect system files
        }
        
        # Add category-specific metadata
        metadata.update(self._extract_specialized_metadata(path, metadata['category']))
        
        return metadata
    
    def _categorize_file(self, path: Path) -> str:
        """Categorize file based on extension and MIME type."""
        suffix = path.suffix.lower()
        mime_type = self._get_mime_type(path)
        
        # Check each category
        for category, info in self.CATEGORIES.items():
            if suffix in info['extensions']:
                return category
            
            if mime_type and any(mt in mime_type for mt in info['mime_types']):
                return category
        
        # Default category
        return 'other'
    
    def _get_mime_type(self, path: Path) -> Optional[str]:
        """Get MIME type using multiple methods."""
        # Try python-magic first (most accurate)
        try:
            return magic.from_file(str(path), mime=True)
        except:
            pass
        
        # Fallback to mimetypes
        mime_type, _ = mimetypes.guess_type(str(path))
        return mime_type
    
    def _calculate_hash(self, path: Path, algorithm: str = 'md5') -> str:
        """Calculate file hash for duplicate detection."""
        hash_func = hashlib.new(algorithm)
        
        with open(path, 'rb') as f:
            # Read in chunks to handle large files
            for chunk in iter(lambda: f.read(4096), b""):
                hash_func.update(chunk)
        
        return hash_func.hexdigest()
    
    def _extract_specialized_metadata(self, path: Path, category: str) -> Dict:
        """Extract category-specific metadata."""
        metadata = {}
        
        try:
            if category == 'images':
                metadata.update(self._extract_image_metadata(path))
            elif category == 'audio':
                metadata.update(self._extract_audio_metadata(path))
            elif category == 'videos':
                metadata.update(self._extract_video_metadata(path))
            elif category == 'documents':
                metadata.update(self._extract_document_metadata(path))
        except Exception as e:
            # Don't fail analysis if metadata extraction fails
            metadata['extraction_error'] = str(e)
        
        return metadata
    
    def _extract_image_metadata(self, path: Path) -> Dict:
        """Extract image-specific metadata."""
        metadata = {}
        
        try:
            # Basic image info with PIL
            with Image.open(path) as img:
                metadata.update({
                    'width': img.width,
                    'height': img.height,
                    'format': img.format,
                    'mode': img.mode
                })
            
            # EXIF data
            with open(path, 'rb') as f:
                exif_data = exifread.process_file(f)
                
                if exif_data:
                    # Extract common EXIF tags
                    exif_info = {}
                    for tag, value in exif_data.items():
                        if tag in ['EXIF DateTimeOriginal', 'EXIF DateTimeDigitized', 'Image DateTime']:
                            try:
                                # Parse EXIF date format: '2023:12:25 14:30:00'
                                date_str = str(value)
                                exif_info['datetime'] = datetime.strptime(date_str, '%Y:%m:%d %H:%M:%S')
                            except:
                                pass
                        elif tag in ['EXIF Make', 'EXIF Model']:
                            exif_info[tag.split()[-1].lower()] = str(value)
                    
                    metadata['exif'] = exif_info
                    
        except Exception as e:
            metadata['image_error'] = str(e)
        
        return metadata
    
    def _extract_audio_metadata(self, path: Path) -> Dict:
        """Extract audio-specific metadata."""
        metadata = {}
        
        try:
            audio = AudioFile(path)
            
            if audio:
                metadata.update({
                    'length': audio.info.length if hasattr(audio.info, 'length') else None,
                    'bitrate': audio.info.bitrate if hasattr(audio.info, 'bitrate') else None,
                    'sample_rate': audio.info.sample_rate if hasattr(audio.info, 'sample_rate') else None,
                    'channels': audio.info.channels if hasattr(audio.info, 'channels') else None
                })
                
                # Extract tags if available
                if hasattr(audio, 'tags'):
                    tags = {}
                    for key in ['title', 'artist', 'album', 'year', 'genre']:
                        if key in audio.tags:
                            tags[key] = str(audio.tags[key])
                    
                    if tags:
                        metadata['tags'] = tags
                        
        except Exception as e:
            metadata['audio_error'] = str(e)
        
        return metadata
    
    def _extract_video_metadata(self, path: Path) -> Dict:
        """Extract video-specific metadata."""
        metadata = {}
        
        try:
            parser = createParser(str(path))
            if parser:
                with parser:
                    extractor = extractMetadata(parser)
                    if extractor:
                        metadata.update({
                            'duration': extractor.get('duration'),
                            'width': extractor.get('width'),
                            'height': extractor.get('height'),
                            'frame_rate': extractor.get('frame_rate'),
                            'bit_rate': extractor.get('bit_rate'),
                            'format': extractor.get('format')
                        })
        except Exception as e:
            metadata['video_error'] = str(e)
        
        return metadata
    
    def _extract_document_metadata(self, path: Path) -> Dict:
        """Extract document-specific metadata."""
        metadata = {}
        
        # For now, just detect if it's text-based
        mime_type = self._get_mime_type(path)
        if mime_type and mime_type.startswith('text/'):
            metadata['is_text'] = True
            
            # Try to detect encoding and read first few lines
            try:
                with open(path, 'r', encoding='utf-8') as f:
                    first_lines = []
                    for i, line in enumerate(f):
                        if i >= 5:  # First 5 lines
                            break
                        first_lines.append(line.strip())
                    
                    metadata['preview'] = first_lines
            except UnicodeDecodeError:
                metadata['encoding'] = 'binary'
        
        return metadata
    
    def get_category_folder(self, category: str) -> str:
        """Get the folder name for a category."""
        return self.CATEGORIES.get(category, {}).get('folder', 'Other')
    
    def is_organizable_file(self, path: Path) -> bool:
        """Check if a file should be organized."""
        # Skip hidden files, system files, and very small files
        if path.name.startswith('.') or path.stat().st_size < 100:
            return False
        
        return True

Step 2: Rules Engine

Create a flexible rules engine for custom organization logic.

# filemaster/rules.py
import re
from typing import Dict, List, Callable, Any, Optional
from pathlib import Path
from datetime import datetime
import yaml

class OrganizationRule:
    """Represents a single organization rule."""
    
    def __init__(self, name: str, conditions: List[Dict], actions: List[Dict], priority: int = 0):
        self.name = name
        self.conditions = conditions
        self.actions = actions
        self.priority = priority
    
    def evaluate(self, file_metadata: Dict) -> bool:
        """Evaluate if this rule applies to a file."""
        for condition in self.conditions:
            if not self._check_condition(condition, file_metadata):
                return False
        return True
    
    def _check_condition(self, condition: Dict, metadata: Dict) -> bool:
        """Check a single condition."""
        field = condition.get('field')
        operator = condition.get('operator', 'equals')
        value = condition.get('value')
        
        if field not in metadata:
            return False
        
        actual_value = metadata[field]
        
        # Handle different operators
        if operator == 'equals':
            return actual_value == value
        elif operator == 'not_equals':
            return actual_value != value
        elif operator == 'contains':
            return value in str(actual_value)
        elif operator == 'not_contains':
            return value not in str(actual_value)
        elif operator == 'matches_regex':
            return bool(re.match(value, str(actual_value)))
        elif operator == 'greater_than':
            return actual_value > value
        elif operator == 'less_than':
            return actual_value < value
        elif operator == 'in_list':
            return actual_value in value
        elif operator == 'not_in_list':
            return actual_value not in value
        
        return False
    
    def apply_actions(self, file_metadata: Dict, base_path: Path) -> Dict:
        """Apply rule actions to determine new path."""
        new_path = base_path
        
        for action in self.actions:
            action_type = action.get('type')
            
            if action_type == 'set_folder':
                folder = action.get('folder', '')
                # Replace variables in folder name
                folder = self._replace_variables(folder, file_metadata)
                new_path = new_path / folder
                
            elif action_type == 'set_filename':
                filename = action.get('filename', '')
                filename = self._replace_variables(filename, file_metadata)
                new_path = new_path.parent / filename
            
            elif action_type == 'add_date_folder':
                date_field = action.get('date_field', 'created')
                format_str = action.get('format', '%Y-%m-%d')
                
                if date_field in file_metadata:
                    date_obj = file_metadata[date_field]
                    if isinstance(date_obj, datetime):
                        folder_name = date_obj.strftime(format_str)
                        new_path = new_path / folder_name
        
        return {
            'new_path': new_path,
            'rule_applied': self.name
        }
    
    def _replace_variables(self, template: str, metadata: Dict) -> str:
        """Replace variables in template strings."""
        result = template
        
        # Replace {field_name} with metadata values
        for key, value in metadata.items():
            if isinstance(value, datetime):
                # Format dates
                result = result.replace(f'{{{key}}}', value.strftime('%Y-%m-%d_%H-%M-%S'))
                result = result.replace(f'{{{key}:date}}', value.strftime('%Y-%m-%d'))
                result = result.replace(f'{{{key}:year}}', value.strftime('%Y'))
                result = result.replace(f'{{{key}:month}}', value.strftime('%m'))
            else:
                result = result.replace(f'{{{key}}}', str(value))
        
        return result

class RulesEngine:
    """Manages and applies organization rules."""
    
    def __init__(self):
        self.rules: List[OrganizationRule] = []
    
    def add_rule(self, rule: OrganizationRule):
        """Add a rule to the engine."""
        self.rules.append(rule)
        # Sort by priority (higher priority first)
        self.rules.sort(key=lambda r: r.priority, reverse=True)
    
    def load_rules_from_file(self, file_path: str):
        """Load rules from a YAML file."""
        with open(file_path, 'r') as f:
            rules_data = yaml.safe_load(f)
        
        for rule_data in rules_data.get('rules', []):
            rule = OrganizationRule(
                name=rule_data['name'],
                conditions=rule_data.get('conditions', []),
                actions=rule_data.get('actions', []),
                priority=rule_data.get('priority', 0)
            )
            self.add_rule(rule)
    
    def apply_rules(self, file_metadata: Dict, base_path: Path) -> Optional[Dict]:
        """Apply the first matching rule to a file."""
        for rule in self.rules:
            if rule.evaluate(file_metadata):
                return rule.apply_actions(file_metadata, base_path)
        
        return None
    
    def get_matching_rules(self, file_metadata: Dict) -> List[OrganizationRule]:
        """Get all rules that match a file."""
        return [rule for rule in self.rules if rule.evaluate(file_metadata)]
    
    def validate_rule(self, rule_data: Dict) -> List[str]:
        """Validate a rule configuration."""
        errors = []
        
        if 'name' not in rule_data:
            errors.append("Rule must have a 'name' field")
        
        if 'conditions' not in rule_data:
            errors.append("Rule must have 'conditions' field")
        elif not isinstance(rule_data['conditions'], list):
            errors.append("'conditions' must be a list")
        
        if 'actions' not in rule_data:
            errors.append("Rule must have 'actions' field")
        elif not isinstance(rule_data['actions'], list):
            errors.append("'actions' must be a list")
        
        # Validate conditions
        for i, condition in enumerate(rule_data.get('conditions', [])):
            if 'field' not in condition:
                errors.append(f"Condition {i} must have a 'field'")
            if 'operator' not in condition:
                errors.append(f"Condition {i} must have an 'operator'")
        
        # Validate actions
        for i, action in enumerate(rule_data.get('actions', [])):
            if 'type' not in action:
                errors.append(f"Action {i} must have a 'type'")
        
        return errors

# Default rules
DEFAULT_RULES = [
    {
        'name': 'Camera Photos',
        'priority': 10,
        'conditions': [
            {'field': 'category', 'operator': 'equals', 'value': 'images'},
            {'field': 'exif', 'operator': 'not_equals', 'value': None}
        ],
        'actions': [
            {'type': 'set_folder', 'folder': 'Photos/{exif[datetime]:date}'},
            {'type': 'set_filename', 'filename': 'IMG_{exif[datetime]:date}_{name}'}
        ]
    },
    {
        'name': 'Screenshots',
        'priority': 8,
        'conditions': [
            {'field': 'category', 'operator': 'equals', 'value': 'images'},
            {'field': 'name', 'operator': 'matches_regex', 'value': r'.*(screenshot|screen.*shot).*'}
        ],
        'actions': [
            {'type': 'set_folder', 'folder': 'Screenshots'},
            {'type': 'add_date_folder'}
        ]
    },
    {
        'name': 'Music by Artist',
        'priority': 7,
        'conditions': [
            {'field': 'category', 'operator': 'equals', 'value': 'audio'},
            {'field': 'tags', 'operator': 'not_equals', 'value': None}
        ],
        'actions': [
            {'type': 'set_folder', 'folder': 'Music/{tags[artist]}/{tags[album]}'}
        ]
    },
    {
        'name': 'Documents by Date',
        'priority': 5,
        'conditions': [
            {'field': 'category', 'operator': 'equals', 'value': 'documents'}
        ],
        'actions': [
            {'type': 'set_folder', 'folder': 'Documents/{created:year}/{created:month}'}
        ]
    }
]

Step 3: Duplicate Detection

Implement duplicate file detection and handling.

# filemaster/duplicates.py
import os
import hashlib
from pathlib import Path
from typing import Dict, List, Set, Tuple, Optional
from collections import defaultdict
import filecmp
from dataclasses import dataclass

@dataclass
class DuplicateGroup:
    """Represents a group of duplicate files."""
    hash_value: str
    files: List[Path]
    total_size: int
    
    def __post_init__(self):
        self.total_size = sum(os.path.getsize(f) for f in self.files if f.exists())

class DuplicateDetector:
    """Detects and manages duplicate files."""
    
    def __init__(self):
        self.hash_cache: Dict[str, List[Path]] = defaultdict(list)
    
    def scan_directory(self, directory: Path, recursive: bool = True) -> Dict[str, DuplicateGroup]:
        """Scan directory for duplicate files."""
        if not directory.exists() or not directory.is_dir():
            raise ValueError(f"Invalid directory: {directory}")
        
        # Clear previous results
        self.hash_cache.clear()
        
        # Scan files
        pattern = "**/*" if recursive else "*"
        for file_path in directory.glob(pattern):
            if file_path.is_file():
                try:
                    file_hash = self._calculate_file_hash(file_path)
                    self.hash_cache[file_hash].append(file_path)
                except (OSError, IOError) as e:
                    # Skip files that can't be read
                    continue
        
        # Create duplicate groups
        duplicates = {}
        for hash_value, files in self.hash_cache.items():
            if len(files) > 1:
                duplicates[hash_value] = DuplicateGroup(hash_value, files, 0)
        
        return duplicates
    
    def _calculate_file_hash(self, file_path: Path, algorithm: str = 'md5', chunk_size: int = 8192) -> str:
        """Calculate hash of file content."""
        hash_func = hashlib.new(algorithm)
        
        try:
            with open(file_path, 'rb') as f:
                while chunk := f.read(chunk_size):
                    hash_func.update(chunk)
        except (OSError, IOError):
            # Return a special hash for unreadable files
            return f"unreadable_{file_path.stat().st_size}_{file_path.stat().st_mtime}"
        
        return hash_func.hexdigest()
    
    def find_duplicates_by_size(self, directory: Path, recursive: bool = True) -> Dict[int, List[Path]]:
        """Find potential duplicates by file size first (faster)."""
        size_groups: Dict[int, List[Path]] = defaultdict(list)
        
        pattern = "**/*" if recursive else "*"
        for file_path in directory.glob(pattern):
            if file_path.is_file():
                try:
                    size = file_path.stat().st_size
                    if size > 0:  # Skip empty files
                        size_groups[size].append(file_path)
                except OSError:
                    continue
        
        # Filter to groups with multiple files
        return {size: files for size, files in size_groups.items() if len(files) > 1}
    
    def verify_duplicates(self, files: List[Path]) -> List[Path]:
        """Verify which files in a list are actually duplicates."""
        if len(files) < 2:
            return []
        
        # Use the first file as reference
        reference = files[0]
        duplicates = [reference]
        
        for file_path in files[1:]:
            try:
                if filecmp.cmp(reference, file_path, shallow=False):
                    duplicates.append(file_path)
            except OSError:
                continue
        
        return duplicates if len(duplicates) > 1 else []
    
    def get_duplicate_stats(self, duplicates: Dict[str, DuplicateGroup]) -> Dict:
        """Get statistics about duplicates."""
        total_groups = len(duplicates)
        total_files = sum(len(group.files) for group in duplicates.values())
        total_wasted_space = sum(group.total_size * (len(group.files) - 1) for group in duplicates.values())
        
        return {
            'total_duplicate_groups': total_groups,
            'total_duplicate_files': total_files,
            'total_wasted_space': total_wasted_space,
            'average_files_per_group': total_files / total_groups if total_groups > 0 else 0
        }
    
    def suggest_cleanup_actions(self, duplicates: Dict[str, DuplicateGroup]) -> List[Dict]:
        """Suggest actions for cleaning up duplicates."""
        suggestions = []
        
        for hash_value, group in duplicates.items():
            if len(group.files) < 2:
                continue
            
            # Sort by modification time (keep newest)
            sorted_files = sorted(group.files, key=lambda f: f.stat().st_mtime, reverse=True)
            
            suggestions.append({
                'hash': hash_value,
                'files': [str(f) for f in sorted_files],
                'keep': str(sorted_files[0]),
                'delete': [str(f) for f in sorted_files[1:]],
                'wasted_space': group.total_size * (len(group.files) - 1),
                'reason': 'Keep newest file, delete older duplicates'
            })
        
        return suggestions

class DuplicateHandler:
    """Handles duplicate file operations."""
    
    def __init__(self, backup_dir: Optional[Path] = None):
        self.backup_dir = backup_dir or Path.home() / '.filemaster' / 'backups'
        self.backup_dir.mkdir(parents=True, exist_ok=True)
    
    def remove_duplicates(self, duplicates: List[Dict], dry_run: bool = True) -> Dict:
        """Remove duplicate files based on suggestions."""
        results = {
            'processed': 0,
            'deleted': 0,
            'backed_up': 0,
            'errors': 0,
            'total_space_saved': 0
        }
        
        for dup_info in duplicates:
            keep_file = Path(dup_info['keep'])
            delete_files = [Path(f) for f in dup_info['delete']]
            
            for file_path in delete_files:
                try:
                    if not file_path.exists():
                        continue
                    
                    results['processed'] += 1
                    
                    if not dry_run:
                        # Create backup
                        backup_path = self._create_backup(file_path)
                        if backup_path:
                            results['backed_up'] += 1
                        
                        # Delete file
                        file_path.unlink()
                        results['deleted'] += 1
                        results['total_space_saved'] += file_path.stat().st_size
                    
                except Exception as e:
                    results['errors'] += 1
                    print(f"Error processing {file_path}: {e}")
        
        return results
    
    def _create_backup(self, file_path: Path) -> Optional[Path]:
        """Create a backup of a file before deletion."""
        try:
            backup_name = f"{file_path.name}.backup_{int(file_path.stat().st_mtime)}"
            backup_path = self.backup_dir / backup_name
            
            # Handle name conflicts
            counter = 1
            while backup_path.exists():
                stem = file_path.stem
                suffix = file_path.suffix
                backup_name = f"{stem}.backup_{counter}_{int(file_path.stat().st_mtime)}{suffix}"
                backup_path = self.backup_dir / backup_name
                counter += 1
            
            import shutil
            shutil.copy2(file_path, backup_path)
            return backup_path
            
        except Exception as e:
            print(f"Failed to create backup for {file_path}: {e}")
            return None
    
    def restore_from_backup(self, backup_file: Path, target_dir: Path) -> bool:
        """Restore a file from backup."""
        try:
            import shutil
            
            # Extract original filename from backup name
            backup_name = backup_file.name
            if '.backup_' in backup_name:
                # Remove backup suffix to get original name
                original_name = backup_name.split('.backup_')[0] + backup_file.suffix
                target_path = target_dir / original_name
                
                shutil.copy2(backup_file, target_path)
                return True
            else:
                return False
                
        except Exception as e:
            print(f"Failed to restore {backup_file}: {e}")
            return False

Step 4: Main Organizer

Create the core file organization logic.

# filemaster/organizer.py
import os
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Callable
from datetime import datetime
import logging
from collections import defaultdict, Counter
import json

from .analyzer import FileAnalyzer
from .rules import RulesEngine, OrganizationRule
from .duplicates import DuplicateDetector, DuplicateHandler

class FileOrganizer:
    """Main file organization engine."""
    
    def __init__(self, config: Optional[Dict] = None):
        self.config = config or self._get_default_config()
        self.analyzer = FileAnalyzer()
        self.rules_engine = RulesEngine()
        self.duplicate_detector = DuplicateDetector()
        self.duplicate_handler = DuplicateHandler()
        
        # Setup logging
        self._setup_logging()
        
        # Load default rules
        self._load_default_rules()
    
    def _get_default_config(self) -> Dict:
        """Get default configuration."""
        return {
            'dry_run': True,
            'recursive': True,
            'follow_symlinks': False,
            'backup_before_move': True,
            'conflict_resolution': 'rename',  # rename, skip, overwrite
            'date_format': '%Y-%m-%d',
            'log_level': 'INFO',
            'exclude_patterns': ['.*', '*.tmp', '*.bak'],
            'include_hidden': False
        }
    
    def _setup_logging(self):
        """Setup logging configuration."""
        log_level = getattr(logging, self.config.get('log_level', 'INFO').upper())
        
        logging.basicConfig(
            level=log_level,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('filemaster.log'),
                logging.StreamHandler()
            ]
        )
        
        self.logger = logging.getLogger('FileOrganizer')
    
    def _load_default_rules(self):
        """Load default organization rules."""
        from .rules import DEFAULT_RULES
        
        for rule_data in DEFAULT_RULES:
            rule = OrganizationRule(
                name=rule_data['name'],
                conditions=rule_data['conditions'],
                actions=rule_data['actions'],
                priority=rule_data['priority']
            )
            self.rules_engine.add_rule(rule)
    
    def organize_directory(self, source_dir: str, target_dir: Optional[str] = None, 
                          progress_callback: Optional[Callable] = None) -> Dict:
        """Organize files in a directory."""
        source_path = Path(source_dir)
        target_path = Path(target_dir) if target_dir else source_path / 'Organized'
        
        if not source_path.exists() or not source_path.is_dir():
            raise ValueError(f"Source directory does not exist: {source_dir}")
        
        self.logger.info(f"Starting organization of {source_path} to {target_path}")
        
        # Scan files
        files_to_process = self._scan_files(source_path)
        self.logger.info(f"Found {len(files_to_process)} files to process")
        
        # Process files
        results = {
            'total_files': len(files_to_process),
            'processed': 0,
            'moved': 0,
            'skipped': 0,
            'errors': 0,
            'categories': defaultdict(int),
            'operations': []
        }
        
        for i, file_path in enumerate(files_to_process):
            try:
                operation = self._process_file(file_path, target_path)
                results['operations'].append(operation)
                results['processed'] += 1
                
                if operation['action'] == 'moved':
                    results['moved'] += 1
                    results['categories'][operation.get('category', 'other')] += 1
                elif operation['action'] == 'skipped':
                    results['skipped'] += 1
                
                if progress_callback:
                    progress_callback(i + 1, len(files_to_process), operation)
                    
            except Exception as e:
                self.logger.error(f"Error processing {file_path}: {e}")
                results['errors'] += 1
                results['operations'].append({
                    'file': str(file_path),
                    'action': 'error',
                    'error': str(e)
                })
        
        # Generate summary
        results['summary'] = self._generate_summary(results)
        self.logger.info(f"Organization complete: {results['moved']} files moved, {results['errors']} errors")
        
        return results
    
    def _scan_files(self, directory: Path) -> List[Path]:
        """Scan directory for files to process."""
        files = []
        pattern = "**/*" if self.config['recursive'] else "*"
        
        for path in directory.glob(pattern):
            if path.is_file() and self._should_process_file(path):
                files.append(path)
        
        return files
    
    def _should_process_file(self, file_path: Path) -> bool:
        """Check if a file should be processed."""
        # Skip hidden files unless configured to include them
        if not self.config['include_hidden'] and file_path.name.startswith('.'):
            return False
        
        # Check exclude patterns
        for pattern in self.config['exclude_patterns']:
            if file_path.match(pattern):
                return False
        
        # Check if file is organizable
        return self.analyzer.is_organizable_file(file_path)
    
    def _process_file(self, file_path: Path, target_base: Path) -> Dict:
        """Process a single file."""
        # Analyze file
        metadata = self.analyzer.analyze_file(str(file_path))
        category = metadata['category']
        
        # Apply rules
        rule_result = self.rules_engine.apply_rules(metadata, target_base)
        
        if rule_result:
            # Custom rule applied
            new_path = rule_result['new_path']
            rule_name = rule_result['rule_applied']
        else:
            # Default organization
            category_folder = self.analyzer.get_category_folder(category)
            new_path = target_base / category_folder / file_path.name
        
        # Ensure target directory exists
        new_path.parent.mkdir(parents=True, exist_ok=True)
        
        # Handle conflicts
        final_path = self._resolve_conflicts(file_path, new_path)
        
        operation = {
            'file': str(file_path),
            'target': str(final_path),
            'category': category,
            'rule': rule_result['rule_applied'] if rule_result else None,
            'action': 'skipped'
        }
        
        # Perform move operation
        if not self.config['dry_run']:
            if self._move_file(file_path, final_path):
                operation['action'] = 'moved'
            else:
                operation['action'] = 'error'
        else:
            operation['action'] = 'would_move'
        
        return operation
    
    def _resolve_conflicts(self, source_path: Path, target_path: Path) -> Path:
        """Resolve filename conflicts."""
        if not target_path.exists():
            return target_path
        
        strategy = self.config['conflict_resolution']
        
        if strategy == 'skip':
            return source_path  # Don't move
        
        elif strategy == 'overwrite':
            return target_path
        
        elif strategy == 'rename':
            # Add number suffix
            stem = target_path.stem
            suffix = target_path.suffix
            parent = target_path.parent
            
            counter = 1
            while True:
                new_name = f"{stem}_{counter}{suffix}"
                new_path = parent / new_name
                if not new_path.exists():
                    return new_path
                counter += 1
        
        return target_path
    
    def _move_file(self, source: Path, target: Path) -> bool:
        """Move a file with error handling."""
        try:
            if self.config['backup_before_move']:
                # Create backup (simple copy for now)
                backup_path = source.parent / f"{source.name}.backup"
                shutil.copy2(source, backup_path)
            
            shutil.move(str(source), str(target))
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to move {source} to {target}: {e}")
            return False
    
    def _generate_summary(self, results: Dict) -> Dict:
        """Generate organization summary."""
        return {
            'completion_time': datetime.now().isoformat(),
            'success_rate': (results['moved'] / results['total_files'] * 100) if results['total_files'] > 0 else 0,
            'most_common_category': max(results['categories'].items(), key=lambda x: x[1]) if results['categories'] else None,
            'total_size_processed': sum(op.get('size', 0) for op in results['operations'] if 'size' in op)
        }
    
    def find_duplicates(self, directory: str) -> Dict:
        """Find duplicate files in directory."""
        dir_path = Path(directory)
        duplicates = self.duplicate_detector.scan_directory(dir_path, self.config['recursive'])
        stats = self.duplicate_detector.get_duplicate_stats(duplicates)
        
        return {
            'duplicates': {k: {'files': [str(f) for f in v.files], 'total_size': v.total_size} 
                          for k, v in duplicates.items()},
            'stats': stats,
            'suggestions': self.duplicate_detector.suggest_cleanup_actions(duplicates)
        }
    
    def cleanup_duplicates(self, duplicate_suggestions: List[Dict], dry_run: bool = True) -> Dict:
        """Clean up duplicate files."""
        self.duplicate_handler = DuplicateHandler()
        return self.duplicate_handler.remove_duplicates(duplicate_suggestions, dry_run)
    
    def undo_last_operation(self) -> bool:
        """Undo the last organization operation."""
        # This would require storing operation history
        # Implementation would depend on how operations are tracked
        self.logger.warning("Undo functionality not yet implemented")
        return False
    
    def export_report(self, results: Dict, output_file: str):
        """Export organization results to file."""
        with open(output_file, 'w') as f:
            json.dump(results, f, indent=2, default=str)
        
        self.logger.info(f"Report exported to {output_file}")

Step 5: Command-Line Interface

Create the CLI for user interaction.

# scripts/organize.py
#!/usr/bin/env python3
"""
FileMaster - Intelligent File Organizer
Command-line interface for organizing files automatically.
"""

import argparse
import sys
import json
from pathlib import Path
from typing import Optional
import time

from filemaster.organizer import FileOrganizer
from filemaster.config import load_config

def create_parser() -> argparse.ArgumentParser:
    """Create command-line argument parser."""
    parser = argparse.ArgumentParser(
        prog='filemaster',
        description='Intelligent File Organizer - Automatically organize your files'
    )
    
    subparsers = parser.add_subparsers(dest='command', help='Available commands')
    
    # Organize command
    organize_parser = subparsers.add_parser('organize', help='Organize files in directory')
    organize_parser.add_argument('source', help='Source directory to organize')
    organize_parser.add_argument('-t', '--target', help='Target directory (default: source/Organized)')
    organize_parser.add_argument('--dry-run', action='store_true', help='Show what would be done without doing it')
    organize_parser.add_argument('--no-recursive', action='store_true', help='Do not scan subdirectories')
    organize_parser.add_argument('--no-backup', action='store_true', help='Do not create backups before moving')
    organize_parser.add_argument('-c', '--config', help='Configuration file path')
    
    # Duplicates command
    dup_parser = subparsers.add_parser('duplicates', help='Find duplicate files')
    dup_parser.add_argument('directory', help='Directory to scan for duplicates')
    dup_parser.add_argument('--no-recursive', action='store_true', help='Do not scan subdirectories')
    dup_parser.add_argument('--cleanup', action='store_true', help='Automatically clean up duplicates')
    dup_parser.add_argument('--dry-run', action='store_true', help='Show cleanup actions without executing')
    
    # Rules command
    rules_parser = subparsers.add_parser('rules', help='Manage organization rules')
    rules_parser.add_argument('--list', action='store_true', help='List current rules')
    rules_parser.add_argument('--add', help='Add rule from file')
    rules_parser.add_argument('--validate', help='Validate rule file')
    
    # Report command
    report_parser = subparsers.add_parser('report', help='Generate organization report')
    report_parser.add_argument('results_file', help='JSON results file from previous organization')
    report_parser.add_argument('-o', '--output', help='Output file (default: report.txt)')
    
    return parser

def progress_callback(current: int, total: int, operation: dict):
    """Progress callback for organization operations."""
    percent = (current / total) * 100
    action = operation.get('action', 'processing')
    file_name = Path(operation['file']).name
    
    print(f"\r[{percent:5.1f}%] {action}: {file_name}", end='', flush=True)
    
    if current == total:
        print()  # New line at end

def main():
    """Main CLI entry point."""
    parser = create_parser()
    args = parser.parse_args()
    
    if not args.command:
        parser.print_help()
        return
    
    try:
        if args.command == 'organize':
            handle_organize(args)
        elif args.command == 'duplicates':
            handle_duplicates(args)
        elif args.command == 'rules':
            handle_rules(args)
        elif args.command == 'report':
            handle_report(args)
        else:
            print(f"Unknown command: {args.command}")
            sys.exit(1)
            
    except Exception as e:
        print(f"Error: {e}")
        sys.exit(1)

def handle_organize(args):
    """Handle organize command."""
    # Load configuration
    config = load_config(args.config) if args.config else {}
    
    # Override config with command-line args
    config.update({
        'dry_run': args.dry_run,
        'recursive': not args.no_recursive,
        'backup_before_move': not args.no_backup
    })
    
    # Create organizer
    organizer = FileOrganizer(config)
    
    print(f"FileMaster - Organizing files in: {args.source}")
    if config['dry_run']:
        print("DRY RUN MODE - No files will be moved")
    print("-" * 50)
    
    start_time = time.time()
    
    # Organize files
    results = organizer.organize_directory(
        args.source, 
        args.target,
        progress_callback=progress_callback
    )
    
    end_time = time.time()
    
    # Print summary
    print("\n" + "=" * 50)
    print("ORGANIZATION COMPLETE")
    print("=" * 50)
    print(f"Total files processed: {results['total_files']}")
    print(f"Files moved: {results['moved']}")
    print(f"Files skipped: {results['skipped']}")
    print(f"Errors: {results['errors']}")
    print(".1f"    print(".1f"    
    if results['categories']:
        print("\nFiles by category:")
        for category, count in sorted(results['categories'].items()):
            print(f"  {category:12s}: {count}")
    
    # Save detailed results
    results_file = f"organization_results_{int(time.time())}.json"
    with open(results_file, 'w') as f:
        json.dump(results, f, indent=2, default=str)
    
    print(f"\nDetailed results saved to: {results_file}")

def handle_duplicates(args):
    """Handle duplicates command."""
    organizer = FileOrganizer()
    
    print(f"Scanning for duplicates in: {args.directory}")
    print("-" * 50)
    
    # Find duplicates
    results = organizer.find_duplicates(args.directory)
    
    stats = results['stats']
    print("DUPLICATE SCAN RESULTS")
    print("=" * 30)
    print(f"Duplicate groups found: {stats['total_duplicate_groups']}")
    print(f"Total duplicate files: {stats['total_duplicate_files']}")
    print(".1f"    print(".1f"    
    if results['duplicates']:
        print("\nDuplicate groups:")
        for i, (hash_val, group) in enumerate(list(results['duplicates'].items())[:5], 1):
            print(f"\nGroup {i} ({len(group['files'])} files, {group['total_size']} bytes each):")
            for file_path in group['files'][:3]:  # Show first 3 files
                print(f"  {file_path}")
            if len(group['files']) > 3:
                print(f"  ... and {len(group['files']) - 3} more")
    
    if args.cleanup and results['suggestions']:
        print(f"\nCleaning up {len(results['suggestions'])} duplicate groups...")
        
        cleanup_results = organizer.cleanup_duplicates(
            results['suggestions'], 
            dry_run=args.dry_run
        )
        
        print("CLEANUP RESULTS")
        print("=" * 20)
        print(f"Files processed: {cleanup_results['processed']}")
        print(f"Files deleted: {cleanup_results['deleted']}")
        print(f"Files backed up: {cleanup_results['backed_up']}")
        print(f"Errors: {cleanup_results['errors']}")
        print(".1f"        
        if args.dry_run:
            print("\nDRY RUN - No files were actually deleted")

def handle_rules(args):
    """Handle rules command."""
    organizer = FileOrganizer()
    
    if args.list:
        print("CURRENT ORGANIZATION RULES")
        print("=" * 30)
        for rule in organizer.rules_engine.rules:
            print(f"β€’ {rule.name} (priority: {rule.priority})")
            print(f"  Conditions: {len(rule.conditions)}")
            print(f"  Actions: {len(rule.actions)}")
    
    elif args.validate:
        print(f"Validating rule file: {args.validate}")
        # Implementation would validate rule file
        print("Rule validation not yet implemented")
    
    elif args.add:
        print(f"Adding rules from: {args.add}")
        # Implementation would add rules from file
        print("Rule addition not yet implemented")

def handle_report(args):
    """Handle report command."""
    print(f"Generating report from: {args.results_file}")
    
    try:
        with open(args.results_file, 'r') as f:
            results = json.load(f)
        
        output_file = args.output or 'organization_report.txt'
        
        with open(output_file, 'w') as f:
            f.write("FILEMASTER ORGANIZATION REPORT\n")
            f.write("=" * 40 + "\n\n")
            
            # Summary
            summary = results.get('summary', {})
            f.write(f"Completion Time: {summary.get('completion_time', 'Unknown')}\n")
            f.write(".1f"            f.write(f"Most Common Category: {summary.get('most_common_category', 'None')}\n\n")
            
            # Statistics
            f.write("STATISTICS\n")
            f.write("-" * 20 + "\n")
            f.write(f"Total Files: {results.get('total_files', 0)}\n")
            f.write(f"Processed: {results.get('processed', 0)}\n")
            f.write(f"Moved: {results.get('moved', 0)}\n")
            f.write(f"Skipped: {results.get('skipped', 0)}\n")
            f.write(f"Errors: {results.get('errors', 0)}\n\n")
            
            # Categories
            if results.get('categories'):
                f.write("FILES BY CATEGORY\n")
                f.write("-" * 20 + "\n")
                for category, count in sorted(results['categories'].items()):
                    f.write(f"{category:15s}: {count}\n")
        
        print(f"Report generated: {output_file}")
        
    except Exception as e:
        print(f"Error generating report: {e}")

if __name__ == "__main__":
    main()

Summary

FileMaster demonstrates advanced Python development:

Core Technologies:

  • File system operations with pathlib
  • Metadata extraction from various file types
  • Rule-based organization engine
  • Duplicate detection algorithms
  • Command-line interface with rich output

Key Skills:

  • Complex class hierarchies and design patterns
  • Error handling for file operations
  • Configuration management
  • Progress tracking and user feedback
  • Algorithm design for categorization

Advanced Features:

  • Custom rules engine with YAML configuration
  • Duplicate file detection with hash comparison
  • Backup and recovery mechanisms
  • Comprehensive logging and reporting
  • Dry-run mode for safe testing

Production Features:

  • Modular architecture for extensibility
  • Comprehensive error handling
  • Progress callbacks for long operations
  • Configuration file support
  • Cross-platform file operations

Next Steps:

  1. Implement the CLI and test basic functionality
  2. Add comprehensive file type detection
  3. Create the rules engine with custom patterns
  4. Implement duplicate detection and cleanup
  5. Add configuration file support
  6. Create comprehensive tests
  7. Package for distribution

Congratulations! You’ve built an intelligent file organization system! πŸ“

Ready for the final project? Let’s create a Personal Finance Tracker! πŸ’°