diff --git a/database/schema.sql b/database/schema.sql index 64e0a8e..ffc7c19 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -18,39 +18,36 @@ CREATE TABLE IF NOT EXISTS wallets ( INDEX idx_trade_count (trade_count) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Tracked wallet addresses from Hyperliquid trades'; --- Wallet PnL snapshots table (optional - for historical tracking) -CREATE TABLE IF NOT EXISTS wallet_pnl_snapshots ( +-- Note: PnL is now calculated from trades table, so wallet_pnl_snapshots table is no longer used + +-- Trades table to store individual trades for each wallet +CREATE TABLE IF NOT EXISTS trades ( id INT AUTO_INCREMENT PRIMARY KEY, wallet_id INT NOT NULL, wallet_address VARCHAR(66) NOT NULL, - pnl DECIMAL(30, 8) COMMENT 'Profit and Loss value', - account_value DECIMAL(30, 8) COMMENT 'Total account value', - unrealized_pnl DECIMAL(30, 8) COMMENT 'Unrealized PnL', - snapshot_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + entry_hash VARCHAR(66) NOT NULL UNIQUE COMMENT 'Unique identifier for the trade entry (transaction hash)', + entry_date TIMESTAMP NULL COMMENT 'When the position was opened', + close_date TIMESTAMP NULL COMMENT 'When the position was closed (NULL if still open)', + coin VARCHAR(20) NOT NULL COMMENT 'Trading pair/coin symbol', + amount DECIMAL(30, 8) NOT NULL COMMENT 'Trade size/amount', + entry_price DECIMAL(30, 8) NOT NULL COMMENT 'Price when position was opened', + close_price DECIMAL(30, 8) NULL COMMENT 'Price when position was closed (NULL if still open)', + direction ENUM('buy', 'sell') NOT NULL DEFAULT 'buy' COMMENT 'Trade direction: buy or sell', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, FOREIGN KEY (wallet_id) REFERENCES wallets(id) ON DELETE CASCADE, INDEX idx_wallet_id (wallet_id), INDEX idx_wallet_address (wallet_address), - INDEX idx_snapshot_at (snapshot_at) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Historical PnL snapshots for tracked wallets'; + INDEX idx_entry_hash (entry_hash), + INDEX idx_coin (coin), + INDEX idx_entry_date (entry_date), + INDEX idx_close_date (close_date), + INDEX idx_direction (direction) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Individual trades for tracked wallets'; --- View for wallets with latest PnL -CREATE OR REPLACE VIEW wallets_with_latest_pnl AS -SELECT - w.id, - w.address, - w.first_seen_at, - w.last_seen_at, - w.trade_count, - wps.pnl, - wps.account_value, - wps.unrealized_pnl, - wps.snapshot_at as last_pnl_snapshot -FROM wallets w -LEFT JOIN wallet_pnl_snapshots wps ON w.id = wps.wallet_id -LEFT JOIN ( - SELECT wallet_id, MAX(snapshot_at) as max_snapshot - FROM wallet_pnl_snapshots - GROUP BY wallet_id -) latest ON w.id = latest.wallet_id AND wps.snapshot_at = latest.max_snapshot; +-- Migration: Add direction column if it doesn't exist +ALTER TABLE trades ADD COLUMN IF NOT EXISTS direction ENUM('buy', 'sell') NOT NULL DEFAULT 'buy' COMMENT 'Trade direction: buy or sell' AFTER close_price; +ALTER TABLE trades ADD INDEX IF NOT EXISTS idx_direction (direction); + +-- Note: PnL is now calculated on-demand from trades table, so the view is no longer needed diff --git a/package-lock.json b/package-lock.json index daead35..154d466 100644 --- a/package-lock.json +++ b/package-lock.json @@ -665,6 +665,7 @@ "integrity": "sha512-VyKBr25BuFDzBFCK5sUM6ZXiWfqgCTwTAOK8qzGV/m9FCirXYDlmczJ+d5dXBAQALGCdRRdbteKYfJ84NGEusw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~6.21.0" } @@ -1875,6 +1876,7 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "devOptional": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" diff --git a/src/database.ts b/src/database.ts index 9ecc374..a92a83a 100644 --- a/src/database.ts +++ b/src/database.ts @@ -6,57 +6,94 @@ import * as path from 'path'; const envLocalPath = path.join(process.cwd(), '.env.local'); const envPath = path.join(process.cwd(), '.env'); -dotenv.config({ path: envLocalPath }); -dotenv.config({ path: envPath }); // .env.local values will override .env +const envLocalResult = dotenv.config({ path: envLocalPath }); +if (envLocalResult.error) { + if (envLocalResult.error.message && !envLocalResult.error.message.includes('ENOENT')) { + console.warn('[ENV] Warning loading .env.local:', envLocalResult.error.message); + } +} else { + console.log('[ENV] โœ… Loaded .env.local from:', envLocalPath); +} + +const envResult = dotenv.config({ path: envPath }); +if (envResult.error) { + if (envResult.error.message && !envResult.error.message.includes('ENOENT')) { + console.warn('[ENV] Warning loading .env:', envResult.error.message); + } +} else { + console.log('[ENV] โœ… Loaded .env from:', envPath); +} import mysql from 'mysql2/promise'; // Database configuration -const dbConfig: mysql.PoolOptions = { - host: process.env.DB_HOST || 'localhost', - port: parseInt(process.env.DB_PORT || '3306'), - user: process.env.DB_USER || 'root', - password: process.env.DB_PASSWORD || '', - database: process.env.DB_NAME || 'hyperliquid_tracker', - waitForConnections: true, - connectionLimit: 10, - queueLimit: 0, - // Connection timeout settings - connectTimeout: 60000, // 60 seconds - // Enable keep-alive to prevent connection from being closed - enableKeepAlive: true, - keepAliveInitialDelay: 0, - // For remote connections, often need SSL or allow insecure connections - ...(process.env.DB_SSL === 'true' ? { - ssl: { - rejectUnauthorized: process.env.DB_SSL_REJECT_UNAUTHORIZED !== 'false' - } - } : {}), - // Additional connection options - timezone: 'Z', // Use UTC - charset: 'utf8mb4', - // Multiple statements can help with some connection issues - multipleStatements: false, - // Flags for better compatibility - flags: [ - '-FOUND_ROWS', - '-IGNORE_SPACE', - '-LONG_PASSWORD', - '-LONG_FLAG', - '-TRANSACTIONS', - '-PROTOCOL_41', - '-SECURE_CONNECTION', - ], +// Helper to get env var with fallback, handling empty strings +const getEnvVar = (key: string, defaultValue: string): string => { + const value = process.env[key]; + if (!value || !value.trim()) { + return defaultValue; + } + return value.trim(); }; +let dbConfig: mysql.PoolOptions = getDBConfig(); + +function getDBConfig(): mysql.PoolOptions { + return { + host: getEnvVar('DB_HOST', 'localhost'), + port: parseInt(getEnvVar('DB_PORT', '3306')), + user: getEnvVar('DB_USER', 'hypr'), + password: process.env.DB_PASSWORD, + database: getEnvVar('DB_NAME', 'hyperliquid_tracker'), + waitForConnections: true, + connectionLimit: 10, + queueLimit: 0, + // Connection timeout settings + connectTimeout: 60000, // 60 seconds + // Enable keep-alive to prevent connection from being closed + enableKeepAlive: true, + keepAliveInitialDelay: 0, + // For remote connections, often need SSL or allow insecure connections + ...(process.env.DB_SSL === 'true' ? { + ssl: { + rejectUnauthorized: process.env.DB_SSL_REJECT_UNAUTHORIZED !== 'false' + } + } : {}), + // Additional connection options + timezone: 'Z', // Use UTC + charset: 'utf8mb4', + // Multiple statements can help with some connection issues + multipleStatements: false, + // Flags for better compatibility + flags: [ + '-FOUND_ROWS', + '-IGNORE_SPACE', + '-LONG_PASSWORD', + '-LONG_FLAG', + '-TRANSACTIONS', + '-PROTOCOL_41', + '-SECURE_CONNECTION', + ], + }; +} + console.log('[DB] Database config loaded:', { host: dbConfig.host, port: dbConfig.port, database: dbConfig.database, user: dbConfig.user, passwordSet: !!dbConfig.password, + passwordLength: dbConfig.password ? dbConfig.password.length : 0, ssl: dbConfig.ssl ? 'enabled' : 'disabled', }); +console.log('[DB] Raw environment variables:', { + DB_HOST: process.env.DB_HOST ? `"${process.env.DB_HOST}"` : '(not set)', + DB_PORT: process.env.DB_PORT ? `"${process.env.DB_PORT}"` : '(not set)', + DB_USER: process.env.DB_USER !== undefined ? `"${process.env.DB_USER}" (length: ${process.env.DB_USER.length})` : '(not set)', + DB_NAME: process.env.DB_NAME ? `"${process.env.DB_NAME}"` : '(not set)', + DB_PASSWORD: process.env.DB_PASSWORD ? `***${process.env.DB_PASSWORD.length} chars***` : '(not set)', +}); +console.log('[DB] getEnvVar result for DB_USER:', getEnvVar('DB_USER', 'hypr')); // Create connection pool let pool: mysql.Pool | null = null; @@ -65,15 +102,19 @@ let pool: mysql.Pool | null = null; * Initialize database connection pool */ export function initDatabase(): mysql.Pool { + console.log('getting db config'); + + if (!pool) { - pool = mysql.createPool(dbConfig); - console.log('[DB] โœ… Database connection pool created'); - console.log('[DB] Config:', { + pool = mysql.createPool({ host: dbConfig.host, port: dbConfig.port, - database: dbConfig.database, user: dbConfig.user, + password: dbConfig.password, + database: dbConfig.database, + connectTimeout: 10000, }); + console.log('[DB] โœ… Database connection pool created'); // Handle connection events pool.on('connection', (connection: any) => { @@ -110,15 +151,18 @@ export async function testConnection(maxRetries: number = 3): Promise { try { // Create a direct connection for testing (not using pool) const directConnection = await mysql.createConnection({ - ...dbConfig, - // Reduce timeout for faster failure detection + host: dbConfig.host, + port: dbConfig.port, + user: dbConfig.user, + password: dbConfig.password, + database: dbConfig.database, connectTimeout: 10000, }); console.log('[DB] ๐Ÿ”Œ Direct connection established'); // Immediately test the connection - const [rows] = await directConnection.query('SELECT 1 as test, DATABASE() as current_db, USER() as current_user, VERSION() as version'); + const [rows] = await directConnection.query('SELECT 1 as test, DATABASE() as current_db, USER() as `current_user`, VERSION() as version'); console.log('[DB] โœ… Test query successful:', rows); await directConnection.ping(); @@ -150,6 +194,7 @@ export async function testConnection(maxRetries: number = 3): Promise { return true; } catch (poolError: any) { + console.error('[DB] Pool connection error:', poolError); console.error(`[DB] โŒ Pool connection failed (attempt ${attempt}/${maxRetries}):`, poolError.message); if (attempt < maxRetries) { @@ -186,8 +231,8 @@ export async function testConnection(maxRetries: number = 3): Promise { connectTimeout: 10000, }; - const testConnection = await mysql.createConnection(testConfig); - const [testRows] = await testConnection.query('SELECT 1 as test, USER() as current_user'); + const testConnection = await mysql.createConnection(dbConfig); + const [testRows] = await testConnection.query('SELECT 1 as test, USER() as `current_user`'); console.log('[DB] โœ… Connection without database works:', testRows); console.log('[DB] ๐Ÿ’ก Issue may be with database name or permissions on that database'); @@ -239,6 +284,12 @@ export async function loadWalletsFromDB(): Promise { */ export async function addWalletToDB(wallet: string): Promise { try { + // Validate wallet address length (Ethereum addresses are exactly 42 characters: 0x + 40 hex chars) + if (!wallet || wallet.length !== 42 || !wallet.startsWith('0x')) { + console.log(`[DB] โš ๏ธ Invalid wallet format/length (expected 42 chars), skipping:`, wallet?.substring(0, 50)); + return false; + } + const normalizedWallet = wallet.toLowerCase(); const connectionPool = getPool(); @@ -258,6 +309,20 @@ export async function addWalletToDB(wallet: string): Promise { } } +/** + * Delete wallet from database (e.g. when invalid length like tx hash) + */ +export async function deleteWalletFromDB(wallet: string): Promise { + try { + const connectionPool = getPool(); + await connectionPool.execute('DELETE FROM wallets WHERE address = ?', [wallet]); + return true; + } catch (error) { + console.error('[DB] Error deleting wallet:', error); + return false; + } +} + /** * Get wallet by address */ @@ -355,6 +420,12 @@ export async function migrateWalletsFromJSON(wallets: string[]): Promise const connectionPool = getPool(); for (const wallet of wallets) { + // Validate wallet address length (Ethereum addresses are exactly 42 characters) + if (!wallet || wallet.length !== 42 || !wallet.startsWith('0x')) { + console.log(`[DB] โš ๏ธ Skipping invalid wallet during migration (length: ${wallet?.length}):`, wallet?.substring(0, 50)); + continue; + } + const normalizedWallet = wallet.toLowerCase(); try { await connectionPool.execute( @@ -377,3 +448,283 @@ export async function migrateWalletsFromJSON(wallets: string[]): Promise } } +/** + * Clean up invalid wallet entries (addresses that are not exactly 42 characters) + * This removes transaction hashes and other invalid data that may have been stored + */ +export async function cleanupInvalidWallets(): Promise { + try { + const connectionPool = getPool(); + + // Delete wallets that are not exactly 42 characters (Ethereum addresses are 0x + 40 hex = 42) + const [result] = await connectionPool.execute( + `DELETE FROM wallets WHERE LENGTH(address) != 42 OR address NOT LIKE '0x%'` + ); + + const deletedCount = result.affectedRows || 0; + if (deletedCount > 0) { + console.log(`[DB] ๐Ÿงน Cleaned up ${deletedCount} invalid wallet entries (non-42-char addresses)`); + } else { + console.log(`[DB] โœ… No invalid wallet entries found`); + } + + return deletedCount; + } catch (error) { + console.error('[DB] Error cleaning up invalid wallets:', error); + return 0; + } +} + +/** + * Save a trade to the database + * Uses entry_hash as unique identifier - updates existing trade if entry_hash exists and close info is provided + */ +export async function saveTradeToDB( + walletAddress: string, + entryHash: string, + coin: string, + amount: string | number, + entryPrice: string | number, + entryDate?: Date | string | null, + closePrice?: string | number | null, + closeDate?: Date | string | null, + direction: 'buy' | 'sell' = 'buy' +): Promise { + try { + const normalizedWallet = walletAddress.toLowerCase(); + const normalizedEntryHash = entryHash.toLowerCase(); + const connectionPool = getPool(); + + // Get wallet ID + const wallet = await getWalletFromDB(normalizedWallet); + if (!wallet) { + console.error(`[DB] Wallet not found for trade: ${normalizedWallet}`); + return false; + } + + // If we have close information, check if it's a zero-PnL trade + if (closePrice !== null && closePrice !== undefined && closeDate) { + const entryPriceNum = parseFloat(String(entryPrice)) || 0; + const closePriceNum = parseFloat(String(closePrice)) || 0; + + // If entry_price equals close_price (zero-PnL), delete the trade instead + if (Math.abs(entryPriceNum - closePriceNum) < 0.00000001) { + console.log(`[DB] Deleting zero-PnL trade: entry_hash ${normalizedEntryHash.substring(0, 16)}... (entry: ${entryPriceNum}, close: ${closePriceNum})`); + await connectionPool.execute( + 'DELETE FROM trades WHERE entry_hash = ?', + [normalizedEntryHash] + ); + return true; + } + + // Update existing trade with close information + await connectionPool.execute( + `UPDATE trades + SET close_price = ?, close_date = ?, updated_at = CURRENT_TIMESTAMP + WHERE entry_hash = ?`, + [closePrice, closeDate, normalizedEntryHash] + ); + } else { + // Insert new trade or update if entry_hash exists (for open trades) + await connectionPool.execute( + `INSERT INTO trades (wallet_id, wallet_address, entry_hash, coin, amount, entry_price, entry_date, close_price, close_date, direction) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + amount = VALUES(amount), + entry_price = VALUES(entry_price), + entry_date = VALUES(entry_date), + direction = VALUES(direction), + updated_at = CURRENT_TIMESTAMP`, + [ + wallet.id, + normalizedWallet, + normalizedEntryHash, + coin, + amount, + entryPrice, + entryDate || null, + closePrice || null, + closeDate || null, + direction, + ] + ); + } + + return true; + } catch (error) { + console.error('[DB] Error saving trade:', error); + return false; + } +} + +/** + * Calculate PnL from stored trades for a wallet + * Returns: { realizedPnl, unrealizedPnl, totalPnl, closedTradesCount, openTradesCount } + */ +export async function calculatePnLFromTrades(walletAddress: string): Promise<{ + realizedPnl: string; + unrealizedPnl: string; + totalPnl: string; + closedTradesCount: number; + openTradesCount: number; +}> { + try { + const trades = await getTradesForWallet(walletAddress); + + let realizedPnl = 0; + let unrealizedPnl = 0; + let closedTradesCount = 0; + let openTradesCount = 0; + + for (const trade of trades) { + const amount = parseFloat(trade.amount) || 0; + const entryPrice = parseFloat(trade.entry_price) || 0; + + if (trade.close_date && trade.close_price) { + // Closed trade - calculate realized PnL + const closePrice = parseFloat(trade.close_price) || 0; + const tradePnl = (closePrice - entryPrice) * amount; + realizedPnl += tradePnl; + closedTradesCount++; + } else { + // Open trade - we'll need current price for unrealized PnL + // For now, we'll set it to 0 and it can be updated with current prices if needed + openTradesCount++; + // Note: Unrealized PnL requires current market price, which we'd need to fetch + // For now, we'll leave it at 0 or calculate it separately if current prices are available + } + } + + return { + realizedPnl: realizedPnl.toFixed(8), + unrealizedPnl: unrealizedPnl.toFixed(8), + totalPnl: (realizedPnl + unrealizedPnl).toFixed(8), + closedTradesCount, + openTradesCount, + }; + } catch (error) { + console.error('[DB] Error calculating PnL from trades:', error); + return { + realizedPnl: '0', + unrealizedPnl: '0', + totalPnl: '0', + closedTradesCount: 0, + openTradesCount: 0, + }; + } +} + +/** + * Get all trades for a wallet + */ +export async function getTradesForWallet(walletAddress: string): Promise { + try { + const normalizedWallet = walletAddress.toLowerCase(); + const connectionPool = getPool(); + + const [rows] = await connectionPool.execute( + `SELECT + id, + wallet_address, + entry_hash, + entry_date, + close_date, + coin, + amount, + entry_price, + close_price, + direction, + created_at, + updated_at + FROM trades + WHERE wallet_address = ? + ORDER BY entry_date DESC, created_at DESC`, + [normalizedWallet] + ); + + return rows as any[]; + } catch (error) { + console.error('[DB] Error getting trades for wallet:', error); + return []; + } +} + +/** + * Get all open trades (where close_date IS NULL) + */ +export async function getOpenTrades(): Promise { + try { + const connectionPool = getPool(); + + const [rows] = await connectionPool.execute( + `SELECT + id, + wallet_address, + entry_hash, + entry_date, + coin, + amount, + entry_price + FROM trades + WHERE close_date IS NULL + ORDER BY entry_date DESC` + ); + + return rows as any[]; + } catch (error) { + console.error('[DB] Error getting open trades:', error); + return []; + } +} + +/** + * Update trade close information by entry_hash + * If entry_price equals close_price (zero-PnL), deletes the trade instead + */ +export async function updateTradeClose( + entryHash: string, + closePrice: string | number, + closeDate: Date | string +): Promise { + try { + const normalizedEntryHash = entryHash.toLowerCase(); + const connectionPool = getPool(); + + // First, get the trade to check entry_price + const [tradeRows] = await connectionPool.execute( + 'SELECT entry_price FROM trades WHERE entry_hash = ?', + [normalizedEntryHash] + ); + + if (tradeRows.length === 0) { + return false; // Trade not found + } + + const entryPrice = parseFloat(tradeRows[0].entry_price) || 0; + const closePriceNum = parseFloat(String(closePrice)) || 0; + + // If entry_price equals close_price (zero-PnL), delete the trade instead + if (Math.abs(entryPrice - closePriceNum) < 0.00000001) { + console.log(`[TRADES] Deleting zero-PnL trade: entry_hash ${normalizedEntryHash.substring(0, 16)}... (entry: ${entryPrice}, close: ${closePriceNum})`); + await connectionPool.execute( + 'DELETE FROM trades WHERE entry_hash = ?', + [normalizedEntryHash] + ); + return true; + } + + // Otherwise, update with close information + const [result] = await connectionPool.execute( + `UPDATE trades + SET close_price = ?, close_date = ?, updated_at = CURRENT_TIMESTAMP + WHERE entry_hash = ? AND close_date IS NULL`, + [closePrice, closeDate, normalizedEntryHash] + ); + + return result.affectedRows > 0; + } catch (error) { + console.error('[DB] Error updating trade close:', error); + return false; + } +} + diff --git a/src/index.ts b/src/index.ts index 0023721..bf1ef89 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,11 +12,14 @@ dotenv.config({ path: envPath }); // .env.local values will override .env console.log('[ENV] Environment variables loaded'); console.log('[ENV] DB_HOST:', process.env.DB_HOST || 'not set'); console.log('[ENV] DB_PORT:', process.env.DB_PORT || 'not set'); +console.log('[ENV] DB_USER:', process.env.DB_USER || 'not set (will default to root)'); console.log('[ENV] DB_NAME:', process.env.DB_NAME || 'not set'); +console.log('[ENV] DB_PASSWORD:', process.env.DB_PASSWORD ? '***set***' : 'not set'); import express, { Request, Response } from 'express'; import { HttpTransport, InfoClient, SubscriptionClient, WebSocketTransport } from '@nktkas/hyperliquid'; import * as fs from 'fs'; +import * as crypto from 'crypto'; import WebSocket from 'ws'; import { initDatabase, @@ -24,8 +27,13 @@ import { loadWalletsFromDB, addWalletToDB, getAllWalletsFromDB, - savePnLSnapshot, migrateWalletsFromJSON, + cleanupInvalidWallets, + deleteWalletFromDB, + saveTradeToDB, + getTradesForWallet, + getOpenTrades, + updateTradeClose, } from './database'; // Set WebSocket as global for @nktkas/rews to use in Node.js environment @@ -39,6 +47,246 @@ app.use(express.json()); const client = new InfoClient({ transport: new HttpTransport() }); +// Price cache - stores current prices for all coins (updated every 5 seconds) +const priceCache = new Map(); +let priceCacheLastUpdate = 0; +const PRICE_CACHE_UPDATE_INTERVAL_MS = 5 * 1000; // 5 seconds + +/** + * Update the price cache by fetching all current prices from Hyperliquid + */ +async function updatePriceCache(): Promise { + try { + console.log('[PRICE CACHE] Updating price cache...'); + + // Get all mids (market prices) from Hyperliquid + const mids = await client.allMids(); + + if (mids && typeof mids === 'object') { + priceCache.clear(); // Clear old cache + + if (Array.isArray(mids)) { + // Handle array format + for (const item of mids) { + if (item && typeof item === 'object') { + const coin = item.coin || item.symbol || item.name; + const price = item.price || item.mid || item.px; + if (coin && price !== undefined) { + const coinUpper = coin.toUpperCase(); + const priceNum = typeof price === 'number' ? price : parseFloat(String(price)); + if (!isNaN(priceNum) && priceNum > 0) { + priceCache.set(coinUpper, priceNum); + } + } + } + } + } else { + // Handle object format - iterate over all keys + for (const key in mids) { + const coinUpper = key.toUpperCase(); + let price: any = (mids as any)[key]; + + // If price is nested object, extract it + if (price && typeof price === 'object') { + price = price.mid || price.price || price.px || price.markPx; + } + + if (price !== null && price !== undefined) { + const priceNum = typeof price === 'number' ? price : parseFloat(String(price)); + if (!isNaN(priceNum) && priceNum > 0) { + priceCache.set(coinUpper, priceNum); + } + } + } + } + + priceCacheLastUpdate = Date.now(); + console.log(`[PRICE CACHE] โœ… Updated cache with ${priceCache.size} coin prices`); + } + } catch (error) { + console.error('[PRICE CACHE] Error updating price cache:', error); + } +} + +/** + * Start the price cache update interval + * Updates prices every 5 seconds + */ +function startPriceCacheUpdates(): void { + // Update immediately on startup + updatePriceCache().catch(err => { + console.error('[PRICE CACHE] Error in initial price cache update:', err); + }); + + // Then update every 5 seconds + setInterval(() => { + updatePriceCache().catch(err => { + console.error('[PRICE CACHE] Error in scheduled price cache update:', err); + }); + }, PRICE_CACHE_UPDATE_INTERVAL_MS); + + console.log('[PRICE CACHE] Started price cache updates (every 5 seconds)'); +} + +/** + * Get current prices for coins from Hyperliquid market data + * Returns a map of coin symbol (uppercase) to current price + * NOTE: This function is now only used as a fallback. Use priceCache directly instead. + */ +async function getCurrentPrices(coins: string[]): Promise> { + const priceMap = new Map(); + + try { + // Get all mids (market prices) from Hyperliquid + const mids = await client.allMids(); + + // Log the structure to debug + console.log('[PRICE] allMids response type:', typeof mids); + console.log('[PRICE] allMids response:', JSON.stringify(mids, null, 2).substring(0, 500)); + + if (mids && typeof mids === 'object') { + // Handle different response formats + // Format 1: Object with coin symbols as keys: { "BTC": "43250.5", "ETH": "2650.25" } + // Format 2: Array of objects: [{ coin: "BTC", price: 43250.5 }, ...] + // Format 3: Object with nested structure + + if (Array.isArray(mids)) { + // Handle array format + for (const item of mids) { + if (item && typeof item === 'object') { + const coin = item.coin || item.symbol || item.name; + const price = item.price || item.mid || item.px; + if (coin && price !== undefined) { + const coinUpper = coin.toUpperCase(); + const priceNum = typeof price === 'number' ? price : parseFloat(String(price)); + if (!isNaN(priceNum)) { + priceMap.set(coinUpper, priceNum); + } + } + } + } + } else { + // Handle object format - try various key patterns + for (const coin of coins) { + const coinUpper = coin.toUpperCase(); + let price: any = null; + + // Try direct key match + price = (mids as any)[coinUpper] || (mids as any)[coin]; + + // Try with different suffixes + if (!price) { + price = (mids as any)[`${coinUpper}-USDC`] || (mids as any)[`${coin}-USDC`]; + } + + // Try nested structure (e.g., { BTC: { mid: 43250.5 } }) + if (!price && (mids as any)[coinUpper] && typeof (mids as any)[coinUpper] === 'object') { + price = (mids as any)[coinUpper].mid || (mids as any)[coinUpper].price || (mids as any)[coinUpper].px; + } + + if (price !== null && price !== undefined) { + const priceNum = typeof price === 'number' ? price : parseFloat(String(price)); + if (!isNaN(priceNum) && priceNum > 0) { + priceMap.set(coinUpper, priceNum); + console.log(`[PRICE] Found price for ${coinUpper}: ${priceNum}`); + } + } + } + } + } + + console.log(`[PRICE] Fetched prices for ${priceMap.size}/${coins.length} coins`); + + // If we didn't get prices, try alternative method: metaAndAssetCtxs + if (priceMap.size === 0) { + console.log('[PRICE] Trying alternative method: metaAndAssetCtxs'); + try { + const meta = await client.metaAndAssetCtxs(); + console.log('[PRICE] metaAndAssetCtxs response:', JSON.stringify(meta, null, 2).substring(0, 500)); + + // metaAndAssetCtxs might have price data in a different structure + if (meta && typeof meta === 'object') { + // Try to extract prices from metadata + const assetCtxs = (meta as any).assetCtxs || (meta as any).ctxs || meta; + + if (assetCtxs && typeof assetCtxs === 'object') { + for (const coin of coins) { + const coinUpper = coin.toUpperCase(); + + // Try to find price in asset context + for (const key in assetCtxs) { + if (key.toUpperCase() === coinUpper || key.includes(coinUpper)) { + const ctx = assetCtxs[key]; + if (ctx && typeof ctx === 'object') { + const price = ctx.mid || ctx.price || ctx.px || ctx.markPx; + if (price !== undefined) { + const priceNum = typeof price === 'number' ? price : parseFloat(String(price)); + if (!isNaN(priceNum) && priceNum > 0) { + priceMap.set(coinUpper, priceNum); + console.log(`[PRICE] Found price for ${coinUpper} via metaAndAssetCtxs: ${priceNum}`); + break; + } + } + } + } + } + } + } + } + } catch (altError) { + console.error('[PRICE] Error with alternative method:', altError); + } + } + } catch (error) { + console.error('[PRICE] Error fetching current prices:', error); + console.error('[PRICE] Error stack:', (error as Error).stack); + } + + return priceMap; +} + +/** + * Enhance trades with current prices and calculate PnL per trade + * Uses the price cache instead of making API calls + */ +async function enhanceTradesWithCurrentPrices(trades: any[]): Promise { + if (trades.length === 0) { + return trades; + } + + // Add current price and PnL to each trade from cache + return trades.map(trade => { + const coinUpper = trade.coin?.toUpperCase() || ''; + const currentPrice = coinUpper ? (priceCache.get(coinUpper) || null) : null; + + // Calculate PnL per trade + let tradePnl: number | null = null; + if (currentPrice !== null && trade.entry_price) { + const entryPrice = parseFloat(String(trade.entry_price)) || 0; + const amount = parseFloat(String(trade.amount)) || 0; + const direction = trade.direction || 'buy'; + + if (entryPrice > 0 && amount > 0 && currentPrice > 0) { + if (direction === 'buy') { + // For buy trades: profit if current_price > entry_price + // PnL = (current_price - entry_price) * amount + tradePnl = (currentPrice - entryPrice) * amount; + } else if (direction === 'sell') { + // For sell trades: profit if entry_price > current_price + // PnL = (entry_price - current_price) * amount + tradePnl = (entryPrice - currentPrice) * amount; + } + } + } + + return { + ...trade, + current_price: currentPrice, + pnl: tradePnl !== null ? tradePnl.toFixed(8) : null, + }; + }); +} + // Create WebSocket transport with logging const wsTransport = new WebSocketTransport(); @@ -94,6 +342,10 @@ async function initializeDatabase(): Promise { return; } + // Clean up invalid wallet entries (transaction hashes, etc.) + console.log('[INIT] Cleaning up invalid wallet entries...'); + await cleanupInvalidWallets(); + // Load wallets from database console.log('[INIT] Loading wallets from database...'); const wallets = await loadWalletsFromDB(); @@ -129,6 +381,152 @@ async function initializeDatabase(): Promise { } } +/** + * Determine if a trade is a buy or sell based on trade data + * Returns 'buy' or 'sell' + * For Hyperliquid: side='B' means buy (Bid), side='A' means sell (Ask) + */ +function determineTradeDirection(tradeData: any): 'buy' | 'sell' { + // Check the 'side' field - Hyperliquid uses 'B' for buy (Bid) and 'A' for sell (Ask) + if (tradeData.side === 'B' || tradeData.side === 'b') { + return 'buy'; + } + if (tradeData.side === 'A' || tradeData.side === 'a') { + return 'sell'; + } + + // Fallback: Check explicit direction fields (case-insensitive strings) + const sideStr = String(tradeData.side || tradeData.direction || tradeData.type || '').toLowerCase(); + if (sideStr === 'buy' || sideStr === 'bid' || sideStr === 'long') { + return 'buy'; + } + if (sideStr === 'sell' || sideStr === 'ask' || sideStr === 'short') { + return 'sell'; + } + + // Check if it's a long (buy) or short (sell) + if (tradeData.isLong === true || tradeData.long === true) { + return 'buy'; + } + if (tradeData.isLong === false || tradeData.short === true) { + return 'sell'; + } + + // Check if size/amount is positive (buy) or negative (sell) + const size = tradeData.sz || tradeData.size || tradeData.amount || tradeData.quantity || '0'; + const sizeNum = parseFloat(String(size)) || 0; + if (sizeNum < 0) { + return 'sell'; + } + if (sizeNum > 0) { + // For Hyperliquid, positive size typically means buy, but side field is more reliable + return 'buy'; + } + + // Default to buy if we can't determine + console.log('[TRADE DIRECTION] โš ๏ธ Could not determine direction from trade data:', { + side: tradeData.side, + direction: tradeData.direction, + size: tradeData.sz || tradeData.size + }); + return 'buy'; +} + +/** + * Extract and save trade data (buy or sell) + * Saves all trades with direction (buy/sell) + */ +async function processAndSaveTrade(tradeData: any, coin: string, walletAddress: string): Promise { + try { + if (!walletAddress || walletAddress.length !== 42 || !walletAddress.startsWith('0x')) { + return; // Invalid wallet address + } + + // Determine if this is a buy or sell + const direction = determineTradeDirection(tradeData); + + // Extract trade identifier + const tradeHash = tradeData.hash || + tradeData.txHash || + tradeData.transactionHash || + tradeData.oid || + tradeData.orderId || + tradeData.tradeId || + tradeData.id || + null; + + // Generate hash if not provided + let finalTradeHash: string = ''; + if (!tradeHash) { + const timestamp = tradeData.time || tradeData.timestamp || Date.now(); + const price = tradeData.px || tradeData.price || '0'; + const hashInput = `${walletAddress}-${coin}-${timestamp}-${price}-${direction}`; + const hash = crypto.createHash('sha256').update(hashInput).digest('hex'); + finalTradeHash = `0x${hash.substring(0, 64)}`; + } else { + finalTradeHash = tradeHash; + } + + // Extract trade fields + const amount = tradeData.sz || tradeData.size || tradeData.amount || tradeData.quantity || '0'; + + // Extract price - for Hyperliquid, 'px' is the execution price (most reliable) + // Use px (execution price) as primary, avoid closePrice which could be wrong + let price: string | number = tradeData.px || tradeData.price || '0'; + + // Validate price - must be a valid number > 0 + const priceNum = parseFloat(String(price)); + if (isNaN(priceNum) || priceNum <= 0) { + console.log(`[TRADE] โš ๏ธ Invalid price for trade: ${price} (px: ${tradeData.px}, price: ${tradeData.price})`); + console.log(`[TRADE] Trade data keys:`, Object.keys(tradeData)); + // Try alternative price fields only if px/price are invalid + const fallbackPrice = tradeData.entryPrice || tradeData.openPrice; + const fallbackPriceNum = fallbackPrice ? parseFloat(String(fallbackPrice)) : 0; + if (isNaN(fallbackPriceNum) || fallbackPriceNum <= 0) { + console.log(`[TRADE] โš ๏ธ Skipping trade with invalid price: ${walletAddress.substring(0, 10)}... ${coin}`); + return; // Skip trade if no valid price found + } + price = String(fallbackPriceNum); + } else { + price = String(priceNum); + } + + // Extract trade date + let tradeDate: Date | null = null; + if (tradeData.time || tradeData.timestamp || tradeData.date) { + const timeValue = tradeData.time || tradeData.timestamp || tradeData.date; + if (typeof timeValue === 'number') { + tradeDate = new Date(timeValue > 1e12 ? timeValue : timeValue * 1000); + } else if (typeof timeValue === 'string') { + tradeDate = new Date(timeValue); + } + } + + if (!tradeDate) { + tradeDate = new Date(); + } + + // Save trade to database (all trades are stored as individual buy/sell events) + // For buy/sell model, we don't need entry/close dates - just the trade date + await saveTradeToDB( + walletAddress, + finalTradeHash, + coin, + Math.abs(parseFloat(String(amount))), // Store absolute amount + price, + tradeDate, + null, // no close price for buy/sell model + null, // no close date for buy/sell model + direction + ); + + console.log(`[TRADE] โœ… Saved ${direction} trade: ${walletAddress.substring(0, 10)}... ${coin} (price: ${price}, amount: ${amount})`); + } catch (error) { + console.error(`[TRADE] Error processing trade for wallet ${walletAddress}:`, error); + console.error(`[TRADE] Error stack:`, (error as Error).stack); + } +} + // Add wallet to tracking (both in-memory and database) async function addWallet(wallet: string): Promise { if (!wallet || typeof wallet !== 'string' || !wallet.startsWith('0x')) { @@ -136,6 +534,12 @@ async function addWallet(wallet: string): Promise { return; } + // Validate wallet address length (Ethereum addresses are exactly 42 characters: 0x + 40 hex chars) + if (wallet.length !== 42) { + console.log(`[WALLET] โš ๏ธ Invalid wallet length (expected 42, got ${wallet.length}), skipping:`, wallet); + return; + } + const normalizedWallet = wallet.toLowerCase(); // Add to in-memory cache @@ -147,8 +551,7 @@ async function addWallet(wallet: string): Promise { try { const success = await addWalletToDB(normalizedWallet); if (success) { - console.log(`[WALLET] โœ… Added/updated wallet in database: ${normalizedWallet}`); - console.log(`[WALLET] ๐Ÿ“Š Total tracked wallets: ${trackedWallets.size}`); + } else { console.error(`[WALLET] โŒ Failed to save wallet to database: ${normalizedWallet}`); } @@ -160,65 +563,251 @@ async function addWallet(wallet: string): Promise { interface WalletPnL { wallet: string; pnl: string; - accountValue?: string; - unrealizedPnl?: string; - realizedPnl?: string; + realizedPnl: string; + unrealizedPnl: string; + closedTradesCount: number; + openTradesCount: number; } /** - * Get PnL for a single wallet + * Get PnL for a single wallet calculated from stored trades + * @param wallet - Wallet address + * @param closedOnly - If true, only calculate PnL from closed trades (exclude open trades) */ -async function getWalletPnL(wallet: string): Promise { +async function getWalletPnL(wallet: string, closedOnly: boolean = false): Promise { try { - const state = await client.clearinghouseState({ user: wallet }); - - // Extract data from marginSummary - const marginSummary = state.marginSummary; - const accountValue = marginSummary?.accountValue || '0'; - - // Calculate unrealized PnL from assetPositions - // PnL is typically the difference between current value and entry value - let totalUnrealizedPnl = '0'; - - if (state.assetPositions && Array.isArray(state.assetPositions)) { - totalUnrealizedPnl = state.assetPositions.reduce((sum, position: any) => { - // Access unrealizedPnl from position structure - // The structure may vary, so we check multiple possible paths - const positionPnl = - position.position?.unrealizedPnl || - position.unrealizedPnl || - (position.position?.unrealizedPnl || '0'); - const pnlValue = parseFloat(positionPnl) || 0; - return (parseFloat(sum) + pnlValue).toString(); - }, '0'); + // Validate wallet address + if (!wallet || wallet.length !== 42 || !wallet.startsWith('0x')) { + return null; } - // Use totalRawUsd from marginSummary as alternative PnL indicator - // totalRawUsd represents total account value including unrealized PnL - // We can use it to derive PnL if needed - const totalRawUsd = marginSummary?.totalRawUsd || '0'; + // Get all trades for this wallet + const trades = await getTradesForWallet(wallet); - // Primary PnL metric: use unrealized PnL from positions, fallback to calculated value - const pnl = totalUnrealizedPnl !== '0' ? totalUnrealizedPnl : totalRawUsd; + // Filter trades if closedOnly is true + const filteredTrades = closedOnly + ? trades.filter((trade: any) => trade.close_date !== null) + : trades; + + let realizedPnl = 0; + let unrealizedPnl = 0; + let closedTradesCount = 0; + let openTradesCount = 0; + + // Get current positions to fetch current prices for open trades (only if not closedOnly) + let currentPositions: Map = new Map(); + if (!closedOnly) { + try { + const state = await client.clearinghouseState({ user: wallet }); + if (state.assetPositions && Array.isArray(state.assetPositions)) { + for (const position of state.assetPositions) { + const coin = position.position?.coin || null; + if (coin) { + currentPositions.set(coin.toUpperCase(), position); + } + } + } + } catch (error) { + // If we can't get current positions, we'll just calculate realized PnL + console.log(`[PnL] Could not fetch current positions for ${wallet}, calculating realized PnL only`); + } + } + + // Calculate PnL from trades + for (const trade of filteredTrades) { + const amount = parseFloat(trade.amount) || 0; + const entryPrice = parseFloat(trade.entry_price) || 0; + + if (trade.close_date && trade.close_price) { + // Closed trade - calculate realized PnL: (close_price - entry_price) * amount + const closePrice = parseFloat(trade.close_price) || 0; + const tradePnl = (closePrice - entryPrice) * amount; + realizedPnl += tradePnl; + closedTradesCount++; + } else if (!closedOnly) { + // Open trade - calculate unrealized PnL using current price (only if not closedOnly) + openTradesCount++; + const coin = trade.coin.toUpperCase(); + const position = currentPositions.get(coin); + if (position) { + // Get current price from position (entryPx is the average entry, but we need mark price) + // Try to get mark price or use entryPx as approximation + const currentPrice = parseFloat(position.position?.markPx || position.position?.entryPx || entryPrice) || entryPrice; + const tradeUnrealizedPnl = (currentPrice - entryPrice) * amount; + unrealizedPnl += tradeUnrealizedPnl; + } + // If position not found, unrealized PnL remains 0 + } + } return { wallet, - pnl, - accountValue, - unrealizedPnl: totalUnrealizedPnl, + pnl: (realizedPnl + unrealizedPnl).toFixed(8), + realizedPnl: realizedPnl.toFixed(8), + unrealizedPnl: unrealizedPnl.toFixed(8), + closedTradesCount, + openTradesCount, }; } catch (error) { - console.error(`Error fetching PnL for wallet ${wallet}:`, error); + const errMsg = (error as Error)?.message || ''; + if (errMsg.includes('Expected 42') && errMsg.includes('received 66')) { + await deleteWalletFromDB(wallet); + console.log('[WALLET] Removed the item with invalid length.'); + } else { + console.error(`Error calculating PnL for wallet ${wallet}:`, error); + } return null; } } +const PNL_UPDATE_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + /** - * Start tracking trades to automatically collect wallet addresses + * Detect and update closed trades by comparing open trades with current positions + */ +async function detectAndUpdateClosedTrades(): Promise { + try { + const openTrades = await getOpenTrades(); + if (openTrades.length === 0) { + return 0; + } + + console.log(`[TRADES] Checking ${openTrades.length} open trades for closures...`); + + // Group open trades by wallet + const tradesByWallet = new Map(); + for (const trade of openTrades) { + const wallet = trade.wallet_address.toLowerCase(); + if (!tradesByWallet.has(wallet)) { + tradesByWallet.set(wallet, []); + } + tradesByWallet.get(wallet)!.push(trade); + } + + let closedCount = 0; + + // Check each wallet's current positions + for (const [wallet, trades] of tradesByWallet.entries()) { + try { + const state = await client.clearinghouseState({ user: wallet }); + + // Get current open positions - map coin to position data + const currentPositions = new Map(); + if (state.assetPositions && Array.isArray(state.assetPositions)) { + for (const position of state.assetPositions) { + // Position structure: { type: "oneWay", position: { coin: string, szi: string, ... } } + const coin = position.position?.coin || null; + const size = position.position?.szi || '0'; + if (coin && parseFloat(size) !== 0) { + currentPositions.set(coin.toUpperCase(), position); + } + } + } + + // Check each open trade - if coin is not in current positions, it's closed + for (const trade of trades) { + const coin = trade.coin.toUpperCase(); + if (!currentPositions.has(coin)) { + // Trade is closed - get close price from market or use entry price as fallback + // For now, we'll use entry price as close price (can be improved with market data) + const closePrice = trade.entry_price; + const closeDate = new Date(); + + const updated = await updateTradeClose(trade.entry_hash, closePrice, closeDate); + if (updated) { + closedCount++; + console.log(`[TRADES] โœ… Closed trade: ${wallet.substring(0, 10)}... ${coin} (entry_hash: ${trade.entry_hash.substring(0, 16)}...)`); + } + } + } + } catch (error) { + console.error(`[TRADES] Error checking wallet ${wallet} for closed trades:`, error); + // Continue with next wallet + } + } + + if (closedCount > 0) { + console.log(`[TRADES] โœ… Updated ${closedCount} closed trades`); + } + + return closedCount; + } catch (error) { + console.error('[TRADES] Error detecting closed trades:', error); + return 0; + } +} + +/** + * Calculate PnL for all tracked wallets from stored trades. + * Used by /api/wallets/tracked and by the 5-minute background job. + */ +async function fetchAndUpdateAllPnL(): Promise<(WalletPnL | null)[]> { + const wallets = await loadWalletsFromDB(); + if (wallets.length === 0) { + console.log('[PnL] No wallets to update'); + return []; + } + console.log(`[PnL] Calculating PnL from trades for ${wallets.length} wallets`); + let completed = 0; + const total = wallets.length; + const walletPnLs = await Promise.all( + wallets.map(async (wallet) => { + const pnlData = await getWalletPnL(wallet); + completed++; + console.log(`[PnL] Progress: ${completed}/${total} - ${wallet.substring(0, 14)}...`); + return pnlData; + }) + ); + const valid = walletPnLs.filter((w): w is WalletPnL => w !== null); + console.log(`[PnL] Done. Calculated PnL for ${valid.length}/${total} wallets.`); + return walletPnLs; +} + +/** + * Check if a trade event represents a closing event (position closure) + * This function determines if a trade is closing a position + */ +function isClosingEvent(tradeData: any): boolean { + // Check various indicators that this is a closing event + // 1. Check if there's explicit close information + if (tradeData.closePrice !== undefined || tradeData.closeDate !== undefined || tradeData.closedPnl !== undefined) { + return true; + } + + // 2. Check if side indicates closing (opposite side from position) + if (tradeData.side === 'close' || tradeData.isClose === true || tradeData.closed === true) { + return true; + } + + // 3. Check if closedPnl field exists (indicates position was closed) + if (tradeData.closedPnl !== undefined && tradeData.closedPnl !== null) { + return true; + } + + // 4. For Hyperliquid, check if it's a reduce/close fill + if (tradeData.fillType === 'close' || tradeData.reduceOnly === true) { + return true; + } + + // If none of the above, it's likely an opening trade + return false; +} + +/** + * Determine if a trade is an opening event (position opening) + */ +function isOpeningEvent(tradeData: any): boolean { + // If it's not a closing event and has price/amount data, it's likely an opening + return !isClosingEvent(tradeData); +} + +/** + * Start tracking trade events (both opening and closing) to automatically collect wallet addresses + * Listens to all trade events and processes both opens and closes */ async function startTradeTracking(): Promise { try { - console.log('[TRACKING] Starting trade tracking...'); + console.log('[TRACKING] Starting trade tracking (opens and closes)...'); console.log('[TRACKING] WebSocket transport:', wsTransport); // Wait for WebSocket connection to be ready @@ -236,113 +825,116 @@ async function startTradeTracking(): Promise { throw error; } - // Common trading pairs to track - const coinsToTrack = ['ETH', 'BTC', 'SOL', 'ARB', 'AVAX']; - console.log(`[TRACKING] Will subscribe to trades for: ${coinsToTrack.join(', ')}`); + // Subscribe to all trades and process both opens and closes + console.log(`[TRACKING] Subscribing to all trade events (opens and closes)...`); - // Subscribe to trades for multiple coins - for (const coin of coinsToTrack) { - try { - console.log(`[TRACKING] Attempting to subscribe to ${coin} trades...`); - - const subscription = await subscriptionClient.trades({ coin }, (trade: any) => { - console.log(`[TRADE] Received trade data for ${coin}:`, JSON.stringify(trade, null, 2)); + try { + console.log(`[TRACKING] Attempting to subscribe to all trade events...`); + + // Subscribe to trades for multiple coins - process both opens and closes + const coinsToTrack = ['ETH', 'BTC', 'SOL', 'ARB', 'AVAX', 'BNB', 'MATIC', 'ADA', 'DOGE', 'XRP']; + console.log(`[TRACKING] Subscribing to trades for: ${coinsToTrack.join(', ')}`); + + for (const coin of coinsToTrack) { + try { + console.log(`[TRACKING] Subscribing to ${coin} trade events...`); - try { - // Extract wallet addresses from trade data - // Trade structure may vary - console.log(`[TRADE] Processing trade data for ${coin}, structure:`, Object.keys(trade)); - - // Check if trade has a 'trades' array or is a single trade - const trades = trade.trades || (Array.isArray(trade) ? trade : [trade]); - console.log(`[TRADE] Found ${trades.length} trade(s) to process`); - - trades.forEach((t: any, index: number) => { - console.log(`[TRADE] Processing trade ${index + 1}/${trades.length}`); - console.log(`[TRADE] Trade ${index + 1} keys:`, Object.keys(t)); + const subscription = await subscriptionClient.trades({ coin }, (trade: any) => { + try { + // Check if trade has a 'trades' array or is a single trade + const trades = trade.trades || (Array.isArray(trade) ? trade : [trade]); - // Collect all potential wallet addresses found - const foundWallets: string[] = []; - - // Check various possible fields for wallet address - const wallet = t.user || t.userAddress || t.account || t.side?.user || t.oid?.user || t.closedPnl?.user; - console.log(`[TRADE] Extracted wallet from primary fields:`, wallet); - - // Scan all fields for potential wallet addresses - const scanForWallets = (obj: any, prefix = ''): void => { - if (!obj || typeof obj !== 'object') return; + trades.forEach((t: any) => { + // Collect all potential wallet addresses found + const foundWallets: string[] = []; - Object.keys(obj).forEach(key => { - const value = obj[key]; - const fullPath = prefix ? `${prefix}.${key}` : key; + // Check various possible fields for wallet address + const wallet = t.user || t.userAddress || t.account || t.side?.user || t.oid?.user || t.closedPnl?.user || t.address; + + // Scan all fields for potential wallet addresses + const scanForWallets = (obj: any, prefix = ''): void => { + if (!obj || typeof obj !== 'object') return; - if (typeof value === 'string' && value.startsWith('0x') && value.length >= 42) { - console.log(`[TRADE] ๐Ÿ” Found potential wallet at '${fullPath}': ${value}`); - foundWallets.push(value); - } else if (typeof value === 'object' && value !== null && !Array.isArray(value)) { - scanForWallets(value, fullPath); - } else if (Array.isArray(value)) { - value.forEach((item, idx) => { - if (typeof item === 'object' && item !== null) { - scanForWallets(item, `${fullPath}[${idx}]`); - } else if (typeof item === 'string' && item.startsWith('0x') && item.length >= 42) { - console.log(`[TRADE] ๐Ÿ” Found potential wallet at '${fullPath}[${idx}]': ${item}`); - foundWallets.push(item); - } - }); + Object.keys(obj).forEach(key => { + const value = obj[key]; + const fullPath = prefix ? `${prefix}.${key}` : key; + + if (typeof value === 'string' && value.startsWith('0x') && value.length === 42) { + foundWallets.push(value); + } else if (typeof value === 'object' && value !== null && !Array.isArray(value)) { + scanForWallets(value, fullPath); + } else if (Array.isArray(value)) { + value.forEach((item, idx) => { + if (typeof item === 'object' && item !== null) { + scanForWallets(item, `${fullPath}[${idx}]`); + } else if (typeof item === 'string' && item.startsWith('0x') && item.length === 42) { + foundWallets.push(item); + } + }); + } + }); + }; + + // Scan the entire trade object + scanForWallets(t); + + // Add primary wallet if found + if (wallet && typeof wallet === 'string' && wallet.length === 42 && wallet.startsWith('0x')) { + foundWallets.push(wallet); + } + + // Add maker/taker wallets if available + if (t.maker && typeof t.maker === 'string' && t.maker.startsWith('0x') && t.maker.length === 42) { + foundWallets.push(t.maker); + } + if (t.taker && typeof t.taker === 'string' && t.taker.startsWith('0x') && t.taker.length === 42) { + foundWallets.push(t.taker); + } + + // Remove duplicates + const uniqueWallets = [...new Set(foundWallets.map(w => w.toLowerCase()))]; + + if (uniqueWallets.length === 0) { + console.log(`[TRADE] โš ๏ธ No wallet addresses found in trade event`); + return; + } + + // Process wallets in parallel - save all trades as buy/sell + Promise.all(uniqueWallets.map(async (w) => { + try { + // Add wallet to tracking (both in-memory and database) + await addWallet(w); + + // Process and save trade (buy or sell) + await processAndSaveTrade(t, coin, w); + } catch (error) { + console.error(`[TRADE] Error processing wallet ${w}:`, error); } - }); - }; - - // Scan the entire trade object - scanForWallets(t); - - // Add all found wallets (remove duplicates) - const uniqueWallets = [...new Set(foundWallets)]; - console.log(`[TRADE] ๐Ÿ“‹ Found ${uniqueWallets.length} unique wallet(s):`, uniqueWallets); - - uniqueWallets.forEach(w => { - addWallet(w).catch(err => { - console.error(`[WALLET] Error adding wallet ${w}:`, err); + })).catch(error => { + console.error('[TRADE] Error in Promise.all for wallets:', error); }); }); - - // Also check for maker/taker if available - if (t.maker && typeof t.maker === 'string' && t.maker.startsWith('0x')) { - console.log(`[WALLET] Adding wallet from maker field: ${t.maker}`); - addWallet(t.maker).catch(err => { - console.error(`[WALLET] Error adding maker wallet:`, err); - }); - } - if (t.taker && typeof t.taker === 'string' && t.taker.startsWith('0x')) { - console.log(`[WALLET] Adding wallet from taker field: ${t.taker}`); - addWallet(t.taker).catch(err => { - console.error(`[WALLET] Error adding taker wallet:`, err); - }); - } - - if (uniqueWallets.length === 0) { - console.log(`[TRADE] โš ๏ธ No wallet addresses found in this trade`); - console.log(`[TRADE] Full trade data:`, JSON.stringify(t, null, 2)); - } - }); - } catch (error) { - console.error('[TRADE] Error processing trade:', error); - console.error('[TRADE] Error stack:', (error as Error).stack); - } - }); - - console.log(`[TRACKING] Successfully subscribed to ${coin} trades, subscription:`, subscription); - } catch (error) { - console.error(`[TRACKING] Error subscribing to ${coin} trades:`, error); - console.error(`[TRACKING] Error details:`, (error as Error).stack); + } catch (error) { + console.error('[TRADE CLOSE] Error processing closing event:', error); + console.error('[TRADE CLOSE] Error stack:', (error as Error).stack); + } + }); + + console.log(`[TRACKING] โœ… Successfully subscribed to ${coin} trade events`); + } catch (error) { + console.error(`[TRACKING] โŒ Error subscribing to ${coin} trade events:`, error); + console.error(`[TRACKING] Error details:`, (error as Error).stack); + } } + + console.log('[TRACKING] โœ… Trade tracking initialization complete (buys and sells)'); + console.log(`[TRACKING] Currently tracking ${trackedWallets.size} wallets`); + } catch (error) { + console.error('[TRACKING] โŒ Fatal error starting trade tracking:', error); + console.error('[TRACKING] Error stack:', (error as Error).stack); } - - console.log('[TRACKING] Trade tracking initialization complete'); - console.log(`[TRACKING] Currently tracking ${trackedWallets.size} wallets`); } catch (error) { - console.error('[TRACKING] Fatal error starting trade tracking:', error); + console.error('[TRACKING] โŒ Fatal error starting trade tracking:', error); console.error('[TRACKING] Error stack:', (error as Error).stack); } } @@ -355,12 +947,12 @@ app.get('/api/wallets/tracked', async (req: Request, res: Response) => { try { console.log(`[API] GET /api/wallets/tracked called`); - // Get wallets from database - const wallets = await loadWalletsFromDB(); - console.log(`[API] Loaded ${wallets.length} wallets from database`); + // Check for closedOnly query parameter + const closedOnly = req.query.closedOnly === 'true' || req.query.closedOnly === '1'; + // Get all wallets + const wallets = await loadWalletsFromDB(); if (wallets.length === 0) { - console.log(`[API] โš ๏ธ No wallets found in database`); return res.json({ success: true, count: 0, @@ -369,42 +961,51 @@ app.get('/api/wallets/tracked', async (req: Request, res: Response) => { }); } - console.log(`[API] Processing ${wallets.length} wallets for PnL calculation`); - - // Fetch PnL for all tracked wallets in parallel + // Calculate PnL for each wallet (with closedOnly filter if specified) + console.log(`[PnL] Calculating PnL from trades for ${wallets.length} wallets${closedOnly ? ' (closed trades only)' : ''}`); + let completed = 0; + const total = wallets.length; const walletPnLs = await Promise.all( wallets.map(async (wallet) => { - const pnlData = await getWalletPnL(wallet); - if (pnlData) { - // Save PnL snapshot to database - await savePnLSnapshot( - wallet, - pnlData.pnl, - pnlData.accountValue || '0', - pnlData.unrealizedPnl || '0' - ).catch(err => { - console.error(`[DB] Error saving PnL snapshot for ${wallet}:`, err); - }); - } + const pnlData = await getWalletPnL(wallet, closedOnly); + completed++; + console.log(`[PnL] Progress: ${completed}/${total} - ${wallet.substring(0, 14)}...`); return pnlData; }) ); - // Filter out null results (failed requests) const validWalletPnLs = walletPnLs.filter((w): w is WalletPnL => w !== null); - - // Sort by PnL (highest first) - convert string to number for sorting validWalletPnLs.sort((a, b) => { const pnlA = parseFloat(a.pnl) || 0; const pnlB = parseFloat(b.pnl) || 0; return pnlB - pnlA; }); + // Add trades for each wallet (with current prices) + const walletsWithTrades = await Promise.all( + validWalletPnLs.map(async (wallet) => { + const trades = await getTradesForWallet(wallet.wallet); + // Filter out open trades if closedOnly is true + const filteredTrades = closedOnly + ? trades.filter((trade: any) => trade.close_date !== null) + : trades; + + // Enhance trades with current prices + const tradesWithPrices = await enhanceTradesWithCurrentPrices(filteredTrades); + + return { + ...wallet, + trades: tradesWithPrices, + }; + }) + ); + res.json({ success: true, count: validWalletPnLs.length, totalTracked: wallets.length, - wallets: validWalletPnLs, + closedOnly: closedOnly, + wallets: walletsWithTrades, }); } catch (error) { console.error('Error in /api/wallets/tracked:', error); @@ -464,10 +1065,24 @@ app.post('/api/wallets/track', async (req: Request, res: Response) => { app.get('/api/wallets/list', async (req: Request, res: Response) => { try { const wallets = await loadWalletsFromDB(); + + // Add trades for each wallet (with current prices) + const walletsWithTrades = await Promise.all( + wallets.map(async (wallet) => { + const trades = await getTradesForWallet(wallet); + // Enhance trades with current prices + const tradesWithPrices = await enhanceTradesWithCurrentPrices(trades); + return { + address: wallet, + trades: tradesWithPrices, + }; + }) + ); + res.json({ success: true, count: wallets.length, - wallets: wallets, + wallets: walletsWithTrades, }); } catch (error) { console.error('Error in GET /api/wallets/list:', error); @@ -500,9 +1115,18 @@ app.get('/api/wallets/pnl', async (req: Request, res: Response) => { return res.status(400).json({ error: 'No valid wallet addresses provided.' }); } + console.log(`[PnL] Processing ${wallets.length} wallets for PnL calculation`); + let completed = 0; + const total = wallets.length; + // Fetch PnL for all wallets in parallel const walletPnLs = await Promise.all( - wallets.map(wallet => getWalletPnL(wallet)) + wallets.map(async (wallet) => { + const pnlData = await getWalletPnL(wallet); + completed++; + console.log(`[PnL] Progress: ${completed}/${total} - ${wallet.substring(0, 14)}...`); + return pnlData; + }) ); // Filter out null results (failed requests) @@ -515,13 +1139,26 @@ app.get('/api/wallets/pnl', async (req: Request, res: Response) => { return pnlB - pnlA; }); + // Add trades for each wallet (with current prices) + const walletsWithTrades = await Promise.all( + validWalletPnLs.map(async (wallet) => { + const trades = await getTradesForWallet(wallet.wallet); + // Enhance trades with current prices + const tradesWithPrices = await enhanceTradesWithCurrentPrices(trades); + return { + ...wallet, + trades: tradesWithPrices, + }; + }) + ); + res.json({ success: true, count: validWalletPnLs.length, - wallets: validWalletPnLs, + wallets: walletsWithTrades, }); } catch (error) { - console.error('Error in /api/wallets/pnl:', error); + console.error('Error in GET /api/wallets/pnl:', error); res.status(500).json({ error: 'Internal server error' }); } }); @@ -539,9 +1176,18 @@ app.post('/api/wallets/pnl', async (req: Request, res: Response) => { }); } + console.log(`[PnL] Processing ${wallets.length} wallets for PnL calculation`); + let completed = 0; + const total = wallets.length; + // Fetch PnL for all wallets in parallel const walletPnLs = await Promise.all( - wallets.map((wallet: string) => getWalletPnL(wallet)) + wallets.map(async (wallet: string) => { + const pnlData = await getWalletPnL(wallet); + completed++; + console.log(`[PnL] Progress: ${completed}/${total} - ${wallet.substring(0, 14)}...`); + return pnlData; + }) ); // Filter out null results (failed requests) @@ -554,10 +1200,23 @@ app.post('/api/wallets/pnl', async (req: Request, res: Response) => { return pnlB - pnlA; }); + // Add trades for each wallet (with current prices) + const walletsWithTrades = await Promise.all( + validWalletPnLs.map(async (wallet) => { + const trades = await getTradesForWallet(wallet.wallet); + // Enhance trades with current prices + const tradesWithPrices = await enhanceTradesWithCurrentPrices(trades); + return { + ...wallet, + trades: tradesWithPrices, + }; + }) + ); + res.json({ success: true, count: validWalletPnLs.length, - wallets: validWalletPnLs, + wallets: walletsWithTrades, }); } catch (error) { console.error('Error in POST /api/wallets/pnl:', error); @@ -570,6 +1229,7 @@ app.get('/', (req: Request, res: Response) => { message: 'Hyperliquid Wallet PnL Tracker API', endpoints: { 'GET /api/wallets/tracked': 'Get automatically tracked wallets sorted by PnL (no input needed)', + 'GET /api/wallets/tracked?closedOnly=true': 'Get tracked wallets with only closed trades (filters out open trades)', 'GET /api/wallets/list': 'Get list of tracked wallet addresses', 'POST /api/wallets/track': 'Manually add wallets to tracking', 'GET /api/wallets/pnl?wallets=0x...': 'Get specific wallets sorted by PnL (query param)', @@ -598,8 +1258,19 @@ app.listen(PORT, async () => { console.log(`๐Ÿ‘› Currently tracking ${trackedWallets.size} wallets`); + // Start price cache updates (updates every 5 seconds) + startPriceCacheUpdates(); + + // Detect and update closed trades every 5 minutes + console.log('[TRADES] Scheduled: detect closed trades every 5 minutes'); + async function runClosedTradesDetectionCycle(): Promise { + await detectAndUpdateClosedTrades(); + setTimeout(runClosedTradesDetectionCycle, PNL_UPDATE_INTERVAL_MS); + } + // Run once at startup, then every 5 minutes + runClosedTradesDetectionCycle(); + // Start trade tracking after server starts - // Add a small delay to ensure server is fully initialized setTimeout(async () => { await startTradeTracking(); }, 1000);