from flask import Flask, render_template, request, jsonify, send_file, session, redirect, url_for from flask_sqlalchemy import SQLAlchemy from werkzeug.utils import secure_filename from datetime import datetime import os import sys import pytesseract from PIL import Image import pdf2image import re import io import pandas as pd import hashlib import json from functools import wraps import uuid import logging from logging.handlers import RotatingFileHandler # Initialize Flask app app = Flask(__name__) # Load configuration app.config.from_object('config.Config') # Initialize database db = SQLAlchemy(app) # Setup logging if not app.debug: if not os.path.exists('logs'): os.mkdir('logs') file_handler = RotatingFileHandler('logs/financial_dms.log', maxBytes=10240, backupCount=10) file_handler.setFormatter(logging.Formatter( '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]' )) file_handler.setLevel(logging.INFO) app.logger.addHandler(file_handler) app.logger.setLevel(logging.INFO) app.logger.info('Financial DMS startup') # Ensure upload directory exists os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # ==================== Database Models ==================== class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) password = db.Column(db.String(200), nullable=False) company_name = db.Column(db.String(100)) created_at = db.Column(db.DateTime, default=datetime.utcnow) def set_password(self, password): self.password = hashlib.sha256(password.encode()).hexdigest() def check_password(self, password): return self.password == hashlib.sha256(password.encode()).hexdigest() class Company(db.Model): __tablename__ = 'companies' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False) user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) class Transaction(db.Model): __tablename__ = 'transactions' id = db.Column(db.Integer, primary_key=True) company_id = db.Column(db.Integer, db.ForeignKey('companies.id'), nullable=False) date = db.Column(db.DateTime, nullable=False) description = db.Column(db.String(200)) amount = db.Column(db.Float, nullable=False) category = db.Column(db.String(50)) type = db.Column(db.String(10)) # 'income' or 'expense' payment_method = db.Column(db.String(50)) document_url = db.Column(db.String(200)) created_at = db.Column(db.DateTime, default=datetime.utcnow) class Document(db.Model): __tablename__ = 'documents' id = db.Column(db.Integer, primary_key=True) company_id = db.Column(db.Integer, db.ForeignKey('companies.id'), nullable=False) filename = db.Column(db.String(200)) file_type = db.Column(db.String(50)) # 'receipt', 'bank_statement' ocr_text = db.Column(db.Text) processed_data = db.Column(db.Text) # JSON stored as text upload_date = db.Column(db.DateTime, default=datetime.utcnow) # ==================== Authentication Decorator ==================== def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'error': 'Authentication required'}), 401 return redirect(url_for('index')) return f(*args, **kwargs) return decorated_function # ==================== OCR Processor Class ==================== class OCRProcessor: def __init__(self): # Configure Tesseract path for production if os.name == 'nt': # Windows pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' else: # Linux (VPS) # Check common installation paths possible_paths = [ '/usr/bin/tesseract', '/usr/local/bin/tesseract' ] for path in possible_paths: if os.path.exists(path): pytesseract.pytesseract.tesseract_cmd = path break def process_image(self, image_file): """Process image file and extract text""" try: image = Image.open(image_file) text = pytesseract.image_to_string(image) return self.extract_financial_data(text) except Exception as e: app.logger.error(f"OCR processing error: {str(e)}") return {'error': str(e), 'text': ''} def process_pdf(self, pdf_file): """Process PDF file and extract text""" try: # Save temporarily temp_path = os.path.join(app.config['UPLOAD_FOLDER'], 'temp_' + str(uuid.uuid4()) + '.pdf') pdf_file.save(temp_path) # Convert PDF to images images = pdf2image.convert_from_path(temp_path) text = "" for image in images: text += pytesseract.image_to_string(image) # Clean up os.remove(temp_path) return self.extract_financial_data(text) except Exception as e: app.logger.error(f"PDF processing error: {str(e)}") return {'error': str(e), 'text': ''} def extract_financial_data(self, text): """Extract financial information from text""" data = { 'transactions': [], 'total_amount': 0, 'date': None, 'vendor': None, 'raw_text': text[:500] } # Extract dates date_pattern = r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}' dates = re.findall(date_pattern, text) if dates: data['date'] = dates[0] # Extract vendor/company name vendor_patterns = [ r'(?:Store|Vendor|Company|From)[:\s]+([A-Za-z\s]+)', r'^([A-Z][A-Za-z\s]+)$' ] for pattern in vendor_patterns: vendor_match = re.search(pattern, text, re.MULTILINE) if vendor_match: data['vendor'] = vendor_match.group(1).strip() break # Extract amounts and create transactions lines = text.split('\n') for line in lines: amount = self.extract_amount(line) if amount > 0 and len(line.strip()) > 3: transaction = { 'description': line.strip()[:100], 'amount': amount } data['transactions'].append(transaction) data['total_amount'] += amount return data def extract_amount(self, text): """Extract numerical amount from text""" amount_match = re.search(r'[$€£]?\s*(\d+[,.]?\d*)', text) if amount_match: amount_str = amount_match.group(1).replace(',', '') try: return float(amount_str) except: return 0 return 0 # Initialize OCR processor ocr_processor = OCRProcessor() # ==================== Routes ==================== # Serve HTML pages @app.route('/') def index(): return render_template('index.html') @app.route('/dashboard') @login_required def dashboard(): return render_template('dashboard.html') # ==================== API Routes ==================== @app.route('/api/register', methods=['POST']) def register(): data = request.json # Check if user exists if User.query.filter_by(username=data['username']).first(): return jsonify({'error': 'Username already exists'}), 400 if User.query.filter_by(email=data['email']).first(): return jsonify({'error': 'Email already exists'}), 400 # Create new user user = User( username=data['username'], email=data['email'], company_name=data.get('company_name', '') ) user.set_password(data['password']) db.session.add(user) db.session.commit() app.logger.info(f"New user registered: {data['username']}") return jsonify({'message': 'User created successfully'}), 201 @app.route('/api/login', methods=['POST']) def login(): data = request.json user = User.query.filter_by(username=data['username']).first() if user and user.check_password(data['password']): session['user_id'] = user.id session['username'] = user.username app.logger.info(f"User logged in: {user.username}") return jsonify({ 'message': 'Logged in successfully', 'user_id': user.id, 'username': user.username }) return jsonify({'error': 'Invalid credentials'}), 401 @app.route('/api/logout', methods=['POST']) def logout(): username = session.get('username') session.clear() if username: app.logger.info(f"User logged out: {username}") return jsonify({'message': 'Logged out successfully'}) @app.route('/api/check-auth', methods=['GET']) def check_auth(): if 'user_id' in session: return jsonify({'authenticated': True, 'username': session.get('username')}) return jsonify({'authenticated': False}) @app.route('/api/companies', methods=['GET']) @login_required def get_companies(): companies = Company.query.filter_by(user_id=session['user_id']).all() return jsonify([{ 'id': c.id, 'name': c.name, 'created_at': c.created_at.isoformat() if c.created_at else None } for c in companies]) @app.route('/api/companies', methods=['POST']) @login_required def create_company(): data = request.json company = Company( name=data['name'], user_id=session['user_id'] ) db.session.add(company) db.session.commit() app.logger.info(f"New company created: {data['name']} by user {session['username']}") return jsonify({'message': 'Company created', 'id': company.id}) @app.route('/api/upload-document', methods=['POST']) @login_required def upload_document(): if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] company_id = request.form.get('company_id') doc_type = request.form.get('type', 'receipt') if not company_id: return jsonify({'error': 'Company ID required'}), 400 # Verify company belongs to user company = Company.query.filter_by(id=company_id, user_id=session['user_id']).first() if not company: return jsonify({'error': 'Company not found'}), 404 if file.filename == '': return jsonify({'error': 'No file selected'}), 400 def allowed_file(filename): ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'pdf'} return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS if allowed_file(file.filename): filename = secure_filename(file.filename) # Add timestamp to prevent filename conflicts name, ext = os.path.splitext(filename) filename = f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}{ext}" filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # Process with OCR with open(filepath, 'rb') as f: if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')): extracted_data = ocr_processor.process_image(f) elif filename.lower().endswith('.pdf'): extracted_data = ocr_processor.process_pdf(f) else: extracted_data = {'text': 'File type not supported for OCR'} # Save document record document = Document( company_id=company_id, filename=filename, file_type=doc_type, ocr_text=extracted_data.get('raw_text', ''), processed_data=json.dumps(extracted_data) ) db.session.add(document) # Create transactions from extracted data if 'transactions' in extracted_data: for trans_data in extracted_data['transactions']: transaction = Transaction( company_id=company_id, date=datetime.now(), description=trans_data.get('description', ''), amount=trans_data.get('amount', 0), category='Uncategorized', type='expense' if doc_type == 'receipt' else 'unknown', document_url=filename ) db.session.add(transaction) db.session.commit() app.logger.info(f"Document uploaded: {filename} for company {company_id}") return jsonify({ 'message': 'Document processed successfully', 'document_id': document.id, 'extracted_data': extracted_data }) return jsonify({'error': 'File type not allowed'}), 400 @app.route('/api/transactions', methods=['GET']) @login_required def get_transactions(): company_id = request.args.get('company_id') if not company_id: return jsonify({'error': 'Company ID required'}), 400 # Verify company belongs to user company = Company.query.filter_by(id=company_id, user_id=session['user_id']).first() if not company: return jsonify({'error': 'Company not found'}), 404 query = Transaction.query.filter_by(company_id=company_id) # Apply date filters if provided start_date = request.args.get('start_date') end_date = request.args.get('end_date') if start_date: query = query.filter(Transaction.date >= datetime.fromisoformat(start_date)) if end_date: query = query.filter(Transaction.date <= datetime.fromisoformat(end_date)) # Apply category filter if provided category = request.args.get('category') if category: query = query.filter(Transaction.category == category) transactions = query.order_by(Transaction.date.desc()).all() return jsonify([{ 'id': t.id, 'company_id': t.company_id, 'date': t.date.isoformat(), 'description': t.description, 'amount': t.amount, 'category': t.category, 'type': t.type, 'payment_method': t.payment_method } for t in transactions]) @app.route('/api/transactions', methods=['POST']) @login_required def add_transaction(): data = request.json # Verify company belongs to user company = Company.query.filter_by(id=data['company_id'], user_id=session['user_id']).first() if not company: return jsonify({'error': 'Company not found'}), 404 transaction = Transaction( company_id=data['company_id'], date=datetime.fromisoformat(data['date']), description=data['description'], amount=data['amount'], category=data.get('category', 'Uncategorized'), type=data.get('type', 'expense'), payment_method=data.get('payment_method') ) db.session.add(transaction) db.session.commit() return jsonify({'message': 'Transaction added', 'id': transaction.id}) @app.route('/api/financial-summary', methods=['GET']) @login_required def financial_summary(): company_id = request.args.get('company_id') if not company_id: return jsonify({'error': 'Company ID required'}), 400 # Verify company belongs to user company = Company.query.filter_by(id=company_id, user_id=session['user_id']).first() if not company: return jsonify({'error': 'Company not found'}), 404 transactions = Transaction.query.filter_by(company_id=company_id).all() # Calculate summary total_income = sum(t.amount for t in transactions if t.type == 'income') total_expenses = sum(t.amount for t in transactions if t.type == 'expense') # Group by category category_summary = {} for t in transactions: if t.category not in category_summary: category_summary[t.category] = 0 category_summary[t.category] += t.amount return jsonify({ 'total_income': round(total_income, 2), 'total_expenses': round(total_expenses, 2), 'net_income': round(total_income - total_expenses, 2), 'transaction_count': len(transactions), 'category_summary': category_summary }) @app.route('/api/export', methods=['GET']) @login_required def export_data(): company_id = request.args.get('company_id') if not company_id: return jsonify({'error': 'Company ID required'}), 400 # Verify company belongs to user company = Company.query.filter_by(id=company_id, user_id=session['user_id']).first() if not company: return jsonify({'error': 'Company not found'}), 404 transactions = Transaction.query.filter_by(company_id=company_id).order_by(Transaction.date).all() # Convert to list of dicts data = [{ 'Date': t.date.strftime('%Y-%m-%d'), 'Description': t.description, 'Amount': t.amount, 'Category': t.category, 'Type': t.type, 'Payment Method': t.payment_method } for t in transactions] # Create CSV df = pd.DataFrame(data) output = io.BytesIO() df.to_csv(output, index=False) output.seek(0) return send_file( output, mimetype='text/csv', as_attachment=True, download_name=f'financial_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv' ) @app.route('/api/delete-transaction/', methods=['DELETE']) @login_required def delete_transaction(transaction_id): transaction = Transaction.query.get_or_404(transaction_id) # Verify company belongs to user company = Company.query.filter_by(id=transaction.company_id, user_id=session['user_id']).first() if not company: return jsonify({'error': 'Unauthorized'}), 403 db.session.delete(transaction) db.session.commit() app.logger.info(f"Transaction {transaction_id} deleted") return jsonify({'message': 'Transaction deleted'}) # Error handlers @app.errorhandler(404) def not_found_error(error): return jsonify({'error': 'Not found'}), 404 @app.errorhandler(500) def internal_error(error): db.session.rollback() app.logger.error(f"Server Error: {error}") return jsonify({'error': 'Internal server error'}), 500 # Create database tables with app.app_context(): db.create_all() app.logger.info("Database tables created/verified")