diff --git a/database/schema.sql b/database/schema.sql index ffc7c19..e404bac 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -51,3 +51,42 @@ ALTER TABLE trades ADD INDEX IF NOT EXISTS idx_direction (direction); -- Note: PnL is now calculated on-demand from trades table, so the view is no longer needed +-- Trade history table to store historical trades fetched from Hyperliquid API +CREATE TABLE IF NOT EXISTS trade_history ( + id INT AUTO_INCREMENT PRIMARY KEY, + wallet_id INT NOT NULL, + wallet_address VARCHAR(66) NOT NULL COMMENT 'Wallet address (normalized to lowercase)', + trade_id VARCHAR(255) NOT NULL COMMENT 'Unique trade ID from Hyperliquid', + coin VARCHAR(20) NOT NULL COMMENT 'Trading pair/coin symbol', + side ENUM('buy', 'sell') NOT NULL COMMENT 'Trade side: buy or sell', + size DECIMAL(30, 8) NOT NULL COMMENT 'Trade size/amount', + price DECIMAL(30, 8) NOT NULL COMMENT 'Execution price (fill price)', + entry_price DECIMAL(30, 8) NULL COMMENT 'Entry price for position (calculated from fills)', + close_price DECIMAL(30, 8) NULL COMMENT 'Close price for position (calculated from fills)', + closed_pnl DECIMAL(30, 8) NULL COMMENT 'Realized PnL from closing this position (from API)', + fee DECIMAL(30, 8) DEFAULT 0 COMMENT 'Fee paid', + timestamp TIMESTAMP NOT NULL COMMENT 'Time trade executed', + order_type VARCHAR(50) NULL COMMENT 'Order type: market, limit, trigger, etc', + status VARCHAR(50) NULL COMMENT 'Order status: filled, cancelled, open, etc', + extra_data JSON NULL COMMENT 'Additional data from API (liquidation, builder fees, etc)', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + FOREIGN KEY (wallet_id) REFERENCES wallets(id) ON DELETE CASCADE, + UNIQUE KEY unique_trade (wallet_address, trade_id), + INDEX idx_wallet_id (wallet_id), + INDEX idx_wallet_address (wallet_address), + INDEX idx_trade_id (trade_id), + INDEX idx_coin (coin), + INDEX idx_timestamp (timestamp), + INDEX idx_side (side) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Historical trades fetched from Hyperliquid API'; + +-- Migration: Add entry_price and close_price columns if they don't exist +ALTER TABLE trade_history ADD COLUMN IF NOT EXISTS entry_price DECIMAL(30, 8) NULL COMMENT 'Entry price for position (calculated from fills)' AFTER price; +ALTER TABLE trade_history ADD COLUMN IF NOT EXISTS close_price DECIMAL(30, 8) NULL COMMENT 'Close price for position (calculated from fills)' AFTER entry_price; +ALTER TABLE trade_history ADD COLUMN IF NOT EXISTS closed_pnl DECIMAL(30, 8) NULL COMMENT 'Realized PnL from closing this position (from API)' AFTER close_price; + +-- Add column to wallets to track last trade fetch time for rate limiting +ALTER TABLE wallets ADD COLUMN IF NOT EXISTS last_trade_fetch_at TIMESTAMP NULL COMMENT 'Last time historical trades were fetched for this wallet' AFTER last_seen_at; +ALTER TABLE wallets ADD INDEX IF NOT EXISTS idx_last_trade_fetch (last_trade_fetch_at); + diff --git a/src/database.ts b/src/database.ts index a92a83a..1df5681 100644 --- a/src/database.ts +++ b/src/database.ts @@ -615,7 +615,7 @@ export async function calculatePnLFromTrades(walletAddress: string): Promise<{ } /** - * Get all trades for a wallet + * Get all trades for a wallet from trade_history table */ export async function getTradesForWallet(walletAddress: string): Promise { try { @@ -626,23 +626,31 @@ export async function getTradesForWallet(walletAddress: string): Promise `SELECT id, wallet_address, - entry_hash, - entry_date, - close_date, + trade_id as entry_hash, + timestamp as entry_date, + CASE WHEN close_price IS NOT NULL THEN timestamp ELSE NULL END as close_date, coin, - amount, + size as amount, entry_price, close_price, - direction, + closed_pnl, + side as direction, + price, + fee, created_at, - updated_at - FROM trades + updated_at, + extra_data + FROM trade_history WHERE wallet_address = ? - ORDER BY entry_date DESC, created_at DESC`, + ORDER BY timestamp DESC, created_at DESC`, [normalizedWallet] ); - return rows as any[]; + // Parse extra_data JSON if present + return rows.map(row => ({ + ...row, + extra_data: row.extra_data ? (typeof row.extra_data === 'string' ? JSON.parse(row.extra_data) : row.extra_data) : null, + })); } catch (error) { console.error('[DB] Error getting trades for wallet:', error); return []; @@ -728,3 +736,373 @@ export async function updateTradeClose( } } +/** + * Save trade history to database + */ +export async function saveTradeHistoryToDB( + walletAddress: string, + tradeId: string, + coin: string, + side: 'buy' | 'sell', + size: string | number, + price: string | number, + timestamp: Date | string, + fee?: string | number, + orderType?: string, + status?: string, + extraData?: any, + entryPrice?: string | number | null, + closePrice?: string | number | null, + closedPnl?: string | number | null +): Promise { + try { + const normalizedWallet = walletAddress.toLowerCase(); + const connectionPool = getPool(); + + // Get wallet ID + const wallet = await getWalletFromDB(normalizedWallet); + if (!wallet) { + console.error(`[DB] Wallet not found for trade history: ${normalizedWallet}`); + return false; + } + + // Insert or update trade history + await connectionPool.execute( + `INSERT INTO trade_history ( + wallet_id, wallet_address, trade_id, coin, side, size, price, entry_price, close_price, closed_pnl, fee, timestamp, order_type, status, extra_data + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + coin = VALUES(coin), + side = VALUES(side), + size = VALUES(size), + price = VALUES(price), + entry_price = VALUES(entry_price), + close_price = VALUES(close_price), + closed_pnl = VALUES(closed_pnl), + fee = VALUES(fee), + timestamp = VALUES(timestamp), + order_type = VALUES(order_type), + status = VALUES(status), + extra_data = VALUES(extra_data), + updated_at = CURRENT_TIMESTAMP`, + [ + wallet.id, + normalizedWallet, + tradeId, + coin, + side, + size, + price, + entryPrice || null, + closePrice || null, + closedPnl || null, + fee || 0, + timestamp, + orderType || null, + status || null, + extraData ? JSON.stringify(extraData) : null, + ] + ); + + return true; + } catch (error) { + console.error('[DB] Error saving trade history:', error); + return false; + } +} + +/** + * Get trade history for a wallet + */ +export async function getTradeHistoryForWallet( + walletAddress: string, + limit?: number +): Promise { + try { + const normalizedWallet = walletAddress.toLowerCase(); + const connectionPool = getPool(); + + const query = limit + ? `SELECT * FROM trade_history WHERE wallet_address = ? ORDER BY timestamp DESC LIMIT ?` + : `SELECT * FROM trade_history WHERE wallet_address = ? ORDER BY timestamp DESC`; + + const [rows] = limit + ? await connectionPool.execute(query, [normalizedWallet, limit]) + : await connectionPool.execute(query, [normalizedWallet]); + + // Parse extra_data JSON if present + return rows.map(row => ({ + ...row, + extra_data: row.extra_data ? JSON.parse(row.extra_data) : null, + })); + } catch (error) { + console.error('[DB] Error getting trade history for wallet:', error); + return []; + } +} + +/** + * Get latest trade timestamp for a wallet (for incremental fetching) + */ +export async function getLatestTradeTimestampForWallet(walletAddress: string): Promise { + try { + const normalizedWallet = walletAddress.toLowerCase(); + const connectionPool = getPool(); + + const [rows] = await connectionPool.execute( + `SELECT MAX(timestamp) as latest_timestamp FROM trade_history WHERE wallet_address = ?`, + [normalizedWallet] + ); + + if (rows.length > 0 && rows[0].latest_timestamp) { + return new Date(rows[0].latest_timestamp); + } + + return null; + } catch (error) { + console.error('[DB] Error getting latest trade timestamp:', error); + return null; + } +} + +/** + * Update last trade fetch time for a wallet + */ +export async function updateLastTradeFetchTime(walletAddress: string): Promise { + try { + const normalizedWallet = walletAddress.toLowerCase(); + const connectionPool = getPool(); + + await connectionPool.execute( + `UPDATE wallets SET last_trade_fetch_at = NOW() WHERE address = ?`, + [normalizedWallet] + ); + + return true; + } catch (error) { + console.error('[DB] Error updating last trade fetch time:', error); + return false; + } +} + +/** + * Get wallets that need trade history fetching (sorted by priority) + * Priority: wallets that haven't been fetched recently or never fetched + */ +export async function getWalletsNeedingTradeFetch(limit?: number): Promise { + try { + const connectionPool = getPool(); + + const query = limit + ? `SELECT address FROM wallets + ORDER BY COALESCE(last_trade_fetch_at, '1970-01-01') ASC, last_seen_at DESC + LIMIT ?` + : `SELECT address FROM wallets + ORDER BY COALESCE(last_trade_fetch_at, '1970-01-01') ASC, last_seen_at DESC`; + + const [rows] = limit + ? await connectionPool.execute(query, [limit]) + : await connectionPool.execute(query); + + return rows.map(row => row.address); + } catch (error) { + console.error('[DB] Error getting wallets needing trade fetch:', error); + return []; + } +} + +/** + * Get wallets with PnL calculated from stored closed_pnl (optimized SQL query) + * Uses SQL aggregation instead of JavaScript calculation + */ +export async function getWalletsWithPnL(closedOnly: boolean = false): Promise { + try { + const connectionPool = getPool(); + + const query = closedOnly + ? ` + SELECT + w.address as wallet, + COALESCE(SUM(COALESCE(th.closed_pnl, 0)), 0) as realizedPnl, + COUNT(*) as closedTradesCount, + 0 as openTradesCount + FROM wallets w + INNER JOIN trade_history th ON w.address = th.wallet_address AND th.close_price IS NOT NULL + GROUP BY w.address + ORDER BY realizedPnl DESC + ` + : ` + SELECT + w.address as wallet, + COALESCE(SUM(CASE WHEN th.close_price IS NOT NULL THEN COALESCE(th.closed_pnl, 0) ELSE 0 END), 0) as realizedPnl, + COUNT(CASE WHEN th.close_price IS NOT NULL THEN 1 END) as closedTradesCount, + COUNT(CASE WHEN th.close_price IS NULL THEN 1 END) as openTradesCount + FROM wallets w + LEFT JOIN trade_history th ON w.address = th.wallet_address + GROUP BY w.address + ORDER BY realizedPnl DESC + `; + + const [rows] = await connectionPool.execute(query); + + return rows.map(row => ({ + wallet: row.wallet, + pnl: parseFloat(row.realizedPnl || 0).toFixed(8), + realizedPnl: parseFloat(row.realizedPnl || 0).toFixed(8), + unrealizedPnl: '0.00000000', // Unrealized PnL requires current prices, calculate separately if needed + closedTradesCount: parseInt(row.closedTradesCount || 0), + openTradesCount: parseInt(row.openTradesCount || 0), + })); + } catch (error) { + console.error('[DB] Error getting wallets with PnL:', error); + return []; + } +} + +/** + * Get wallets with their trades using SQL JOIN (optimized single query) + * Groups trades per wallet in SQL instead of multiple queries + * @param closedOnly - If true, only return closed trades + * @param walletAddresses - Optional array of wallet addresses to filter by + * @param limit - Maximum number of wallets to return (default: no limit) + */ +export async function getWalletsWithTrades(closedOnly: boolean = false, walletAddresses?: string[], limit?: number): Promise { + try { + const connectionPool = getPool(); + + const walletParams = walletAddresses && walletAddresses.length > 0 + ? walletAddresses.map(w => w.toLowerCase()) + : []; + + const walletFilter = walletParams.length > 0 + ? `WHERE w.address IN (${walletParams.map(() => '?').join(',')})` + : ''; + + // First get wallets with aggregated PnL + // Always apply limit if provided (default should be 100 from API) + // Ensure limit is a valid number + const validLimit = limit && typeof limit === 'number' && limit > 0 ? limit : undefined; + const limitClause = validLimit ? `LIMIT ?` : ''; + const pnlQuery = closedOnly + ? ` + SELECT + w.address as wallet, + COALESCE(SUM(COALESCE(th.closed_pnl, 0)), 0) as realizedPnl, + COUNT(*) as closedTradesCount, + 0 as openTradesCount + FROM wallets w + INNER JOIN trade_history th ON w.address = th.wallet_address AND th.close_price IS NOT NULL + ${walletFilter} + GROUP BY w.address + ORDER BY realizedPnl DESC + ${limitClause ? limitClause : ''} + `.trim() + : ` + SELECT + w.address as wallet, + COALESCE(SUM(CASE WHEN th.close_price IS NOT NULL THEN COALESCE(th.closed_pnl, 0) ELSE 0 END), 0) as realizedPnl, + COUNT(CASE WHEN th.close_price IS NOT NULL THEN 1 END) as closedTradesCount, + COUNT(CASE WHEN th.close_price IS NULL THEN 1 END) as openTradesCount + FROM wallets w + LEFT JOIN trade_history th ON w.address = th.wallet_address + ${walletFilter} + GROUP BY w.address + ORDER BY realizedPnl DESC + ${limitClause ? limitClause : ''} + `.trim(); + + // Build query parameters: wallet addresses (if any) + limit (if any) + const queryParams: any[] = []; + if (walletParams.length > 0) { + queryParams.push(...walletParams); + } + if (validLimit) { + queryParams.push(validLimit); + } + + console.log(`[DB] getWalletsWithTrades - limit: ${limit}, validLimit: ${validLimit}, limitClause: "${limitClause}", queryParams:`, queryParams); + console.log(`[DB] SQL Query preview: ... ${limitClause || 'NO LIMIT'}`); + + const [pnlRows] = await connectionPool.execute(pnlQuery, queryParams); + + console.log(`[DB] getWalletsWithTrades - returned ${pnlRows.length} rows from SQL`); + + // Apply limit to results if not already applied in SQL (safety check) + // Always apply limit if it was provided, regardless of validLimit check + const finalLimit = limit && typeof limit === 'number' && limit > 0 ? limit : undefined; + const limitedRows = finalLimit && pnlRows.length > finalLimit + ? pnlRows.slice(0, finalLimit) + : pnlRows; + + console.log(`[DB] getWalletsWithTrades - after limit check: ${limitedRows.length} rows (finalLimit: ${finalLimit})`); + + if (limitedRows.length === 0) { + return []; + } + + // Get all trades for these wallets in one query + const walletList = limitedRows.map(row => row.wallet.toLowerCase()); + if (walletList.length === 0) { + return []; + } + + const placeholders = walletList.map(() => '?').join(','); + const closedFilter = closedOnly + ? 'AND th.close_price IS NOT NULL' + : ''; + + const tradesQuery = ` + SELECT + th.wallet_address, + th.id, + th.trade_id as entry_hash, + th.timestamp as entry_date, + CASE WHEN th.close_price IS NOT NULL THEN th.timestamp ELSE NULL END as close_date, + th.coin, + th.size as amount, + th.entry_price, + th.close_price, + th.closed_pnl, + th.side as direction, + th.price, + th.fee, + th.created_at, + th.updated_at, + th.extra_data + FROM trade_history th + WHERE th.wallet_address IN (${placeholders}) ${closedFilter} + ORDER BY th.wallet_address, th.timestamp DESC + `; + + const [tradeRows] = await connectionPool.execute(tradesQuery, walletList); + + // Parse extra_data JSON and group trades by wallet + const tradesByWallet = new Map(); + for (const trade of tradeRows) { + const wallet = trade.wallet_address.toLowerCase(); + if (!tradesByWallet.has(wallet)) { + tradesByWallet.set(wallet, []); + } + tradesByWallet.get(wallet)!.push({ + ...trade, + extra_data: trade.extra_data ? (typeof trade.extra_data === 'string' ? JSON.parse(trade.extra_data) : trade.extra_data) : null, + }); + } + + // Combine PnL data with trades + return pnlRows.map(row => ({ + wallet: row.wallet, + pnl: parseFloat(row.realizedPnl || 0).toFixed(8), + realizedPnl: parseFloat(row.realizedPnl || 0).toFixed(8), + unrealizedPnl: '0.00000000', + closedTradesCount: parseInt(row.closedTradesCount || 0), + openTradesCount: parseInt(row.openTradesCount || 0), + trades: tradesByWallet.get(row.wallet.toLowerCase()) || [], + })); + } catch (error) { + console.error('[DB] Error getting wallets with trades:', error); + return []; + } +} + diff --git a/src/index.ts b/src/index.ts index bf1ef89..88c7f6c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -34,6 +34,13 @@ import { getTradesForWallet, getOpenTrades, updateTradeClose, + saveTradeHistoryToDB, + getTradeHistoryForWallet, + getLatestTradeTimestampForWallet, + updateLastTradeFetchTime, + getWalletsNeedingTradeFetch, + getWalletsWithPnL, + getWalletsWithTrades, } from './database'; // Set WebSocket as global for @nktkas/rews to use in Node.js environment @@ -248,6 +255,7 @@ async function getCurrentPrices(coins: string[]): Promise> { /** * Enhance trades with current prices and calculate PnL per trade * Uses the price cache instead of making API calls + * Works with trade_history table structure */ async function enhanceTradesWithCurrentPrices(trades: any[]): Promise { if (trades.length === 0) { @@ -259,12 +267,29 @@ async function enhanceTradesWithCurrentPrices(trades: any[]): Promise { const coinUpper = trade.coin?.toUpperCase() || ''; const currentPrice = coinUpper ? (priceCache.get(coinUpper) || null) : null; + // Check if trade is closed (has close_price) + const isClosed = trade.close_price !== null && trade.close_price !== undefined; + // Calculate PnL per trade let tradePnl: number | null = null; - if (currentPrice !== null && trade.entry_price) { + + // If closed, use closed_pnl from API if available (most accurate) + if (isClosed && trade.closed_pnl !== null && trade.closed_pnl !== undefined) { + tradePnl = parseFloat(String(trade.closed_pnl)) || null; + } else if (isClosed && trade.close_price && trade.entry_price) { + // Fallback: calculate from entry/close prices for closed trades const entryPrice = parseFloat(String(trade.entry_price)) || 0; - const amount = parseFloat(String(trade.amount)) || 0; - const direction = trade.direction || 'buy'; + const closePrice = parseFloat(String(trade.close_price)) || 0; + const amount = parseFloat(String(trade.amount || trade.size)) || 0; + + if (entryPrice > 0 && closePrice > 0 && amount > 0) { + tradePnl = (closePrice - entryPrice) * amount; + } + } else if (!isClosed && currentPrice !== null && trade.entry_price) { + // For open trades, calculate unrealized PnL using current price + const entryPrice = parseFloat(String(trade.entry_price)) || 0; + const amount = parseFloat(String(trade.amount || trade.size)) || 0; + const direction = trade.direction || trade.side || 'buy'; if (entryPrice > 0 && amount > 0 && currentPrice > 0) { if (direction === 'buy') { @@ -584,9 +609,9 @@ async function getWalletPnL(wallet: string, closedOnly: boolean = false): Promis // Get all trades for this wallet const trades = await getTradesForWallet(wallet); - // Filter trades if closedOnly is true + // Filter trades if closedOnly is true (check close_price instead of close_date) const filteredTrades = closedOnly - ? trades.filter((trade: any) => trade.close_date !== null) + ? trades.filter((trade: any) => trade.close_price !== null && trade.close_price !== undefined) : trades; let realizedPnl = 0; @@ -615,14 +640,23 @@ async function getWalletPnL(wallet: string, closedOnly: boolean = false): Promis // Calculate PnL from trades for (const trade of filteredTrades) { - const amount = parseFloat(trade.amount) || 0; + const amount = parseFloat(trade.amount || trade.size) || 0; const entryPrice = parseFloat(trade.entry_price) || 0; - if (trade.close_date && trade.close_price) { - // Closed trade - calculate realized PnL: (close_price - entry_price) * amount - const closePrice = parseFloat(trade.close_price) || 0; - const tradePnl = (closePrice - entryPrice) * amount; - realizedPnl += tradePnl; + // Check if trade is closed (close_price IS NOT NULL or closed_pnl IS NOT NULL) + const isClosed = trade.close_price !== null && trade.close_price !== undefined; + + if (isClosed) { + // Closed trade - use closed_pnl from API if available, otherwise calculate + if (trade.closed_pnl !== null && trade.closed_pnl !== undefined) { + // Use closed_pnl from API (most accurate) + realizedPnl += parseFloat(trade.closed_pnl) || 0; + } else if (trade.close_price && entryPrice > 0) { + // Fallback: calculate from entry/close prices + const closePrice = parseFloat(trade.close_price) || 0; + const tradePnl = (closePrice - entryPrice) * amount; + realizedPnl += tradePnl; + } closedTradesCount++; } else if (!closedOnly) { // Open trade - calculate unrealized PnL using current price (only if not closedOnly) @@ -763,6 +797,342 @@ async function fetchAndUpdateAllPnL(): Promise<(WalletPnL | null)[]> { return walletPnLs; } +/** + * Fetch historical trade history for a wallet from Hyperliquid API + */ +async function fetchHistoricalTradesForWallet(walletAddress: string): Promise { + try { + if (!walletAddress || walletAddress.length !== 42 || !walletAddress.startsWith('0x')) { + return 0; + } + + console.log(`[HISTORICAL] Fetching trade history for wallet: ${walletAddress.substring(0, 10)}...`); + + // Get latest trade timestamp to fetch only new trades (incremental fetching) + const latestTimestamp = await getLatestTradeTimestampForWallet(walletAddress); + const startTime = latestTimestamp ? Math.floor(latestTimestamp.getTime() / 1000) : undefined; + + // Try to fetch user fills/historical trades from Hyperliquid API + // Note: The exact method name may vary - adjust based on @nktkas/hyperliquid SDK + let fills: any[] = []; + + try { + // Try userFills method (common in Hyperliquid API) + if ((client as any).userFills) { + const response = await (client as any).userFills({ user: walletAddress, startTime }); + fills = Array.isArray(response) ? response : response?.fills || response?.data || []; + } + // Try userFillsByTime if available + else if ((client as any).userFillsByTime) { + const response = await (client as any).userFillsByTime({ user: walletAddress, startTime }); + fills = Array.isArray(response) ? response : response?.fills || response?.data || []; + } + // Try historicalOrders if available + else if ((client as any).historicalOrders) { + const response = await (client as any).historicalOrders({ user: walletAddress, startTime }); + fills = Array.isArray(response) ? response : response?.orders || response?.fills || response?.data || []; + } + // If none of the methods are available, log error + else { + console.error(`[HISTORICAL] No suitable method found on InfoClient to fetch user fills. Available methods may vary.`); + console.error(`[HISTORICAL] Tried: userFills, userFillsByTime, historicalOrders`); + return 0; + } + } catch (apiError: any) { + // If it's a rate limit error, log and return + if (apiError?.message?.includes('rate limit') || apiError?.status === 429) { + console.log(`[HISTORICAL] Rate limit hit for wallet ${walletAddress.substring(0, 10)}...`); + throw apiError; // Re-throw to let caller handle rate limiting + } + console.error(`[HISTORICAL] Error fetching trades for ${walletAddress.substring(0, 10)}...:`, apiError?.message || apiError); + return 0; + } + + if (!Array.isArray(fills) || fills.length === 0) { + console.log(`[HISTORICAL] No new trades found for wallet: ${walletAddress.substring(0, 10)}...`); + await updateLastTradeFetchTime(walletAddress); + return 0; + } + + // Get existing trade history to calculate current positions + const existingTrades = await getTradeHistoryForWallet(walletAddress); + + // Track positions per coin: { coin: { size: number, entryPrice: number } } + const positions = new Map(); + + // Process existing trades to calculate current positions + for (const trade of existingTrades) { + const coin = trade.coin.toUpperCase(); + const size = parseFloat(trade.size) || 0; + const fillPrice = parseFloat(trade.price) || 0; + + if (!positions.has(coin)) { + positions.set(coin, { size: 0, entryPrice: 0, entryValue: 0 }); + } + + const position = positions.get(coin)!; + const absSize = Math.abs(size); + + if (trade.side === 'buy') { + // Adding to position + const newSize = position.size + absSize; + const newValue = position.entryValue + (absSize * fillPrice); + position.size = newSize; + position.entryValue = newValue; + position.entryPrice = newSize > 0 ? newValue / newSize : 0; + } else if (trade.side === 'sell') { + // Reducing position + if (absSize >= position.size) { + // Fully closing position + position.size = 0; + position.entryPrice = 0; + position.entryValue = 0; + } else { + // Partially closing position + position.size = position.size - absSize; + position.entryValue = position.size * position.entryPrice; + } + } + } + + // Process and save each new trade with position tracking + let savedCount = 0; + for (const fill of fills) { + try { + // Extract trade data - adjust fields based on Hyperliquid API response format + const tradeId = fill.id || fill.oid || fill.tradeId || `${fill.time || Date.now()}-${fill.px || ''}-${fill.sz || ''}`; + const coin = fill.coin || fill.symbol || fill.market || ''; + + // Fix side detection: 'B' or 'b' = buy (Bid), 'A' or 'a' = sell (Ask) + let side: 'buy' | 'sell' = 'buy'; // default + if (fill.side === 'B' || fill.side === 'b') { + side = 'buy'; + } else if (fill.side === 'A' || fill.side === 'a') { + side = 'sell'; + } else if (fill.isLong === true || fill.isLong === false) { + // If isLong is explicitly set, use it + side = fill.isLong ? 'buy' : 'sell'; + } else { + // Fallback: check if closedPnl suggests a close (but don't use it for side) + // Try to infer from size or other fields + const sizeNum = parseFloat(fill.sz || fill.size || '0'); + // If size is negative, it might indicate sell + if (sizeNum < 0) { + side = 'sell'; + } + } + const size = fill.sz || fill.size || fill.amount || '0'; + const price = fill.px || fill.price || '0'; + const fee = fill.fee || fill.commission || '0'; + const timestamp = fill.time ? new Date(fill.time > 1e12 ? fill.time : fill.time * 1000) : new Date(); + const orderType = fill.orderType || fill.type || null; + const status = fill.status || 'filled'; + + const coinUpper = coin.toUpperCase(); + const sizeNum = Math.abs(parseFloat(size)); + const priceNum = parseFloat(price); + + // Extract API fields for position tracking + const closedPnl = fill.closedPnl ? parseFloat(fill.closedPnl) : null; + const dir = fill.dir || ''; // e.g., "Close Long" + const startPosition = fill.startPosition ? parseFloat(fill.startPosition) : null; + const isClosing = dir.toLowerCase().includes('close'); + + // Track positions and calculate entry/close prices + if (!positions.has(coinUpper)) { + positions.set(coinUpper, { size: 0, entryPrice: 0, entryValue: 0 }); + } + + const position = positions.get(coinUpper)!; + let entryPrice: number | null = null; + let closePrice: number | null = null; + + // If API provides closedPnl, use it to calculate entry_price + if (isClosing && closedPnl !== null && closedPnl !== undefined && sizeNum > 0) { + // This is a closing trade with PnL + closePrice = priceNum; + + // Calculate entry_price from closedPnl + // For closing long: PnL = (close_price - entry_price) * size + // So: entry_price = close_price - (closedPnl / size) + if (dir.toLowerCase().includes('long')) { + entryPrice = priceNum - (closedPnl / sizeNum); + } else if (dir.toLowerCase().includes('short')) { + // For closing short: PnL = (entry_price - close_price) * size + // So: entry_price = close_price + (closedPnl / size) + entryPrice = priceNum + (closedPnl / sizeNum); + } else { + // Default to long calculation + entryPrice = priceNum - (closedPnl / sizeNum); + } + + // Update position tracking + if (startPosition !== null && startPosition !== undefined) { + // Use startPosition to update position + if (sizeNum >= position.size) { + // Fully closing position + position.size = 0; + position.entryPrice = 0; + position.entryValue = 0; + } else { + // Partially closing position + position.size = position.size - sizeNum; + position.entryValue = position.size * position.entryPrice; + } + } else if (sizeNum >= position.size) { + // Fully closing position + position.size = 0; + position.entryPrice = 0; + position.entryValue = 0; + } else { + // Partially closing position + position.size = position.size - sizeNum; + position.entryValue = position.size * position.entryPrice; + } + } else if (side === 'buy') { + // Opening or adding to position + const wasZero = position.size === 0; + const newSize = position.size + sizeNum; + const newValue = position.entryValue + (sizeNum * priceNum); + + // If opening a new position (was zero), this is the entry price + if (wasZero) { + entryPrice = priceNum; + } else { + // Use weighted average entry price + entryPrice = newValue / newSize; + } + + position.size = newSize; + position.entryValue = newValue; + position.entryPrice = entryPrice; + } else if (side === 'sell') { + // Closing or reducing position + if (position.size > 0) { + // Position exists, this is closing (partially or fully) + closePrice = priceNum; + // Use the current position's entry price for PnL calculation + entryPrice = position.entryPrice > 0 ? position.entryPrice : null; + + if (sizeNum >= position.size) { + // Fully closing position + position.size = 0; + position.entryPrice = 0; + position.entryValue = 0; + } else { + // Partially closing position + position.size = position.size - sizeNum; + position.entryValue = position.size * position.entryPrice; + } + } else { + // No position, this might be a short or closing a short (we don't track shorts) + // For now, just use the fill price + closePrice = null; + entryPrice = null; + } + } + + // Save trade history with entry/close prices and closedPnl + const saved = await saveTradeHistoryToDB( + walletAddress, + String(tradeId), + coinUpper, + side, + size, + price, + timestamp, + fee, + orderType, + status, + fill, // Store full fill data in extra_data + entryPrice ? String(entryPrice) : null, + closePrice ? String(closePrice) : null, + closedPnl !== null && closedPnl !== undefined ? String(closedPnl) : null + ); + + if (saved) { + savedCount++; + } + } catch (tradeError) { + console.error(`[HISTORICAL] Error saving trade for ${walletAddress.substring(0, 10)}...:`, tradeError); + } + } + + // Update last fetch time + await updateLastTradeFetchTime(walletAddress); + + console.log(`[HISTORICAL] ✅ Fetched and saved ${savedCount} trades for wallet: ${walletAddress.substring(0, 10)}...`); + + return savedCount; + } catch (error: any) { + console.error(`[HISTORICAL] Error fetching historical trades for ${walletAddress.substring(0, 10)}...:`, error?.message || error); + // Don't update last fetch time on error (so we can retry) + return 0; + } +} + +/** + * Rate-limited background job to fetch historical trades for all wallets + * Respects rate limits by processing wallets sequentially with delays + */ +async function startHistoricalTradeFetcher(): Promise { + console.log('[HISTORICAL FETCHER] Starting historical trade fetcher...'); + + // Rate limiting configuration + const RATE_LIMIT_REQUESTS_PER_MINUTE = 20; // Conservative limit + const RATE_LIMIT_DELAY_MS = Math.ceil(60000 / RATE_LIMIT_REQUESTS_PER_MINUTE); // Delay between requests + const BATCH_SIZE = 10; // Process wallets in batches + + async function fetchBatch(): Promise { + try { + // Get wallets that need fetching (prioritized by last fetch time) + const walletsToFetch = await getWalletsNeedingTradeFetch(BATCH_SIZE); + + if (walletsToFetch.length === 0) { + console.log('[HISTORICAL FETCHER] No wallets need fetching right now'); + return; + } + + console.log(`[HISTORICAL FETCHER] Processing batch of ${walletsToFetch.length} wallets...`); + + // Process wallets sequentially with rate limiting + for (const wallet of walletsToFetch) { + try { + await fetchHistoricalTradesForWallet(wallet); + + // Wait between requests to respect rate limits + await new Promise(resolve => setTimeout(resolve, RATE_LIMIT_DELAY_MS)); + } catch (error: any) { + // If rate limited, wait longer before continuing + if (error?.message?.includes('rate limit') || error?.status === 429) { + console.log('[HISTORICAL FETCHER] Rate limit hit, waiting 60 seconds...'); + await new Promise(resolve => setTimeout(resolve, 60000)); // Wait 1 minute + } else { + console.error(`[HISTORICAL FETCHER] Error fetching for ${wallet.substring(0, 10)}...:`, error?.message || error); + } + } + } + + console.log(`[HISTORICAL FETCHER] ✅ Batch complete`); + } catch (error) { + console.error('[HISTORICAL FETCHER] Error in fetch batch:', error); + } + } + + // Run initial fetch + fetchBatch(); + + // Then run every 5 minutes + const FETCH_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + setInterval(() => { + fetchBatch().catch(error => { + console.error('[HISTORICAL FETCHER] Error in scheduled fetch:', error); + }); + }, FETCH_INTERVAL_MS); + + console.log(`[HISTORICAL FETCHER] ✅ Historical trade fetcher started (runs every 5 minutes)`); +} + /** * Check if a trade event represents a closing event (position closure) * This function determines if a trade is closing a position @@ -802,12 +1172,12 @@ function isOpeningEvent(tradeData: any): boolean { } /** - * Start tracking trade events (both opening and closing) to automatically collect wallet addresses - * Listens to all trade events and processes both opens and closes + * Start tracking trade events to automatically collect wallet addresses + * Only stores wallets - trades will be fetched via historical API */ async function startTradeTracking(): Promise { try { - console.log('[TRACKING] Starting trade tracking (opens and closes)...'); + console.log('[TRACKING] Starting wallet tracking (trades will be fetched via historical API)...'); console.log('[TRACKING] WebSocket transport:', wsTransport); // Wait for WebSocket connection to be ready @@ -825,13 +1195,13 @@ async function startTradeTracking(): Promise { throw error; } - // Subscribe to all trades and process both opens and closes - console.log(`[TRACKING] Subscribing to all trade events (opens and closes)...`); + // Subscribe to all trades to collect wallet addresses only + console.log(`[TRACKING] Subscribing to trade events to collect wallet addresses...`); try { - console.log(`[TRACKING] Attempting to subscribe to all trade events...`); + console.log(`[TRACKING] Attempting to subscribe to trade events...`); - // Subscribe to trades for multiple coins - process both opens and closes + // Subscribe to trades for multiple coins - only collect wallet addresses const coinsToTrack = ['ETH', 'BTC', 'SOL', 'ARB', 'AVAX', 'BNB', 'MATIC', 'ADA', 'DOGE', 'XRP']; console.log(`[TRACKING] Subscribing to trades for: ${coinsToTrack.join(', ')}`); @@ -895,28 +1265,24 @@ async function startTradeTracking(): Promise { const uniqueWallets = [...new Set(foundWallets.map(w => w.toLowerCase()))]; if (uniqueWallets.length === 0) { - console.log(`[TRADE] ⚠️ No wallet addresses found in trade event`); - return; + return; // No wallet addresses found, skip } - // Process wallets in parallel - save all trades as buy/sell + // Only add wallets to tracking - trades will be fetched via historical API Promise.all(uniqueWallets.map(async (w) => { try { // Add wallet to tracking (both in-memory and database) await addWallet(w); - - // Process and save trade (buy or sell) - await processAndSaveTrade(t, coin, w); } catch (error) { - console.error(`[TRADE] Error processing wallet ${w}:`, error); + console.error(`[TRADE] Error adding wallet ${w}:`, error); } })).catch(error => { console.error('[TRADE] Error in Promise.all for wallets:', error); }); }); } catch (error) { - console.error('[TRADE CLOSE] Error processing closing event:', error); - console.error('[TRADE CLOSE] Error stack:', (error as Error).stack); + console.error('[TRADE] Error processing trade event:', error); + console.error('[TRADE] Error stack:', (error as Error).stack); } }); @@ -927,21 +1293,21 @@ async function startTradeTracking(): Promise { } } - console.log('[TRACKING] ✅ Trade tracking initialization complete (buys and sells)'); + console.log('[TRACKING] ✅ Wallet tracking initialization complete'); console.log(`[TRACKING] Currently tracking ${trackedWallets.size} wallets`); } catch (error) { - console.error('[TRACKING] ❌ Fatal error starting trade tracking:', error); + console.error('[TRACKING] ❌ Fatal error starting wallet tracking:', error); console.error('[TRACKING] Error stack:', (error as Error).stack); } } catch (error) { - console.error('[TRACKING] ❌ Fatal error starting trade tracking:', error); + console.error('[TRACKING] ❌ Fatal error starting wallet tracking:', error); console.error('[TRACKING] Error stack:', (error as Error).stack); } } /** * API endpoint to get tracked wallets sorted by PnL - * No input required - uses automatically tracked wallets + * Optimized: Uses SQL JOINs and stored PnL instead of calculating in JavaScript */ app.get('/api/wallets/tracked', async (req: Request, res: Response) => { try { @@ -950,9 +1316,20 @@ app.get('/api/wallets/tracked', async (req: Request, res: Response) => { // Check for closedOnly query parameter const closedOnly = req.query.closedOnly === 'true' || req.query.closedOnly === '1'; - // Get all wallets - const wallets = await loadWalletsFromDB(); - if (wallets.length === 0) { + // Get limit parameter (default: 100, max: 1000) + const limitParam = req.query.limit as string; + let limit: number = 100; // default to 100 + if (limitParam) { + const parsedLimit = parseInt(limitParam, 10); + if (!isNaN(parsedLimit) && parsedLimit > 0) { + limit = Math.min(parsedLimit, 1000); // cap at 1000 + } + } + + // Get wallets with PnL and trades using optimized SQL query (single query with JOINs) + const walletsWithTrades = await getWalletsWithTrades(closedOnly, undefined, limit); + + if (walletsWithTrades.length === 0) { return res.json({ success: true, count: 0, @@ -961,51 +1338,20 @@ app.get('/api/wallets/tracked', async (req: Request, res: Response) => { }); } - // Calculate PnL for each wallet (with closedOnly filter if specified) - console.log(`[PnL] Calculating PnL from trades for ${wallets.length} wallets${closedOnly ? ' (closed trades only)' : ''}`); - let completed = 0; - const total = wallets.length; - const walletPnLs = await Promise.all( - wallets.map(async (wallet) => { - const pnlData = await getWalletPnL(wallet, closedOnly); - completed++; - console.log(`[PnL] Progress: ${completed}/${total} - ${wallet.substring(0, 14)}...`); - return pnlData; - }) - ); - - const validWalletPnLs = walletPnLs.filter((w): w is WalletPnL => w !== null); - validWalletPnLs.sort((a, b) => { - const pnlA = parseFloat(a.pnl) || 0; - const pnlB = parseFloat(b.pnl) || 0; - return pnlB - pnlA; - }); - - // Add trades for each wallet (with current prices) - const walletsWithTrades = await Promise.all( - validWalletPnLs.map(async (wallet) => { - const trades = await getTradesForWallet(wallet.wallet); - // Filter out open trades if closedOnly is true - const filteredTrades = closedOnly - ? trades.filter((trade: any) => trade.close_date !== null) - : trades; - - // Enhance trades with current prices - const tradesWithPrices = await enhanceTradesWithCurrentPrices(filteredTrades); - - return { - ...wallet, - trades: tradesWithPrices, - }; - }) + // Enhance trades with current prices (only for display, PnL already calculated from stored closed_pnl) + const enhancedWallets = await Promise.all( + walletsWithTrades.map(async wallet => ({ + ...wallet, + trades: await enhanceTradesWithCurrentPrices(wallet.trades), + })) ); res.json({ success: true, - count: validWalletPnLs.length, - totalTracked: wallets.length, + count: enhancedWallets.length, + totalTracked: enhancedWallets.length, closedOnly: closedOnly, - wallets: walletsWithTrades, + wallets: enhancedWallets, }); } catch (error) { console.error('Error in /api/wallets/tracked:', error); @@ -1061,28 +1407,25 @@ app.post('/api/wallets/track', async (req: Request, res: Response) => { /** * API endpoint to get list of tracked wallet addresses (without PnL) + * Optimized: Uses SQL JOIN to get wallets with trades in single query */ app.get('/api/wallets/list', async (req: Request, res: Response) => { try { - const wallets = await loadWalletsFromDB(); + // Get wallets with trades using optimized SQL query + const walletsWithTrades = await getWalletsWithTrades(false); - // Add trades for each wallet (with current prices) - const walletsWithTrades = await Promise.all( - wallets.map(async (wallet) => { - const trades = await getTradesForWallet(wallet); - // Enhance trades with current prices - const tradesWithPrices = await enhanceTradesWithCurrentPrices(trades); - return { - address: wallet, - trades: tradesWithPrices, - }; - }) + // Format response (remove PnL data, keep only address and trades) + const enhancedWallets = await Promise.all( + walletsWithTrades.map(async wallet => ({ + address: wallet.wallet, + trades: await enhanceTradesWithCurrentPrices(wallet.trades), + })) ); res.json({ success: true, - count: wallets.length, - wallets: walletsWithTrades, + count: enhancedWallets.length, + wallets: enhancedWallets, }); } catch (error) { console.error('Error in GET /api/wallets/list:', error); @@ -1115,47 +1458,23 @@ app.get('/api/wallets/pnl', async (req: Request, res: Response) => { return res.status(400).json({ error: 'No valid wallet addresses provided.' }); } - console.log(`[PnL] Processing ${wallets.length} wallets for PnL calculation`); - let completed = 0; - const total = wallets.length; + console.log(`[PnL] Processing ${wallets.length} wallets using optimized SQL query`); - // Fetch PnL for all wallets in parallel - const walletPnLs = await Promise.all( - wallets.map(async (wallet) => { - const pnlData = await getWalletPnL(wallet); - completed++; - console.log(`[PnL] Progress: ${completed}/${total} - ${wallet.substring(0, 14)}...`); - return pnlData; - }) - ); + // Get wallets with PnL and trades using optimized SQL query (single query with JOINs) + const walletsWithTrades = await getWalletsWithTrades(false, wallets); - // Filter out null results (failed requests) - const validWalletPnLs = walletPnLs.filter((w): w is WalletPnL => w !== null); - - // Sort by PnL (highest first) - convert string to number for sorting - validWalletPnLs.sort((a, b) => { - const pnlA = parseFloat(a.pnl) || 0; - const pnlB = parseFloat(b.pnl) || 0; - return pnlB - pnlA; - }); - - // Add trades for each wallet (with current prices) - const walletsWithTrades = await Promise.all( - validWalletPnLs.map(async (wallet) => { - const trades = await getTradesForWallet(wallet.wallet); - // Enhance trades with current prices - const tradesWithPrices = await enhanceTradesWithCurrentPrices(trades); - return { - ...wallet, - trades: tradesWithPrices, - }; - }) + // Enhance trades with current prices + const enhancedWallets = await Promise.all( + walletsWithTrades.map(async wallet => ({ + ...wallet, + trades: await enhanceTradesWithCurrentPrices(wallet.trades), + })) ); res.json({ success: true, - count: validWalletPnLs.length, - wallets: walletsWithTrades, + count: enhancedWallets.length, + wallets: enhancedWallets, }); } catch (error) { console.error('Error in GET /api/wallets/pnl:', error); @@ -1176,47 +1495,23 @@ app.post('/api/wallets/pnl', async (req: Request, res: Response) => { }); } - console.log(`[PnL] Processing ${wallets.length} wallets for PnL calculation`); - let completed = 0; - const total = wallets.length; + console.log(`[PnL] Processing ${wallets.length} wallets using optimized SQL query`); - // Fetch PnL for all wallets in parallel - const walletPnLs = await Promise.all( - wallets.map(async (wallet: string) => { - const pnlData = await getWalletPnL(wallet); - completed++; - console.log(`[PnL] Progress: ${completed}/${total} - ${wallet.substring(0, 14)}...`); - return pnlData; - }) - ); + // Get wallets with PnL and trades using optimized SQL query (single query with JOINs) + const walletsWithTrades = await getWalletsWithTrades(false, wallets); - // Filter out null results (failed requests) - const validWalletPnLs = walletPnLs.filter((w): w is WalletPnL => w !== null); - - // Sort by PnL (highest first) - convert string to number for sorting - validWalletPnLs.sort((a, b) => { - const pnlA = parseFloat(a.pnl) || 0; - const pnlB = parseFloat(b.pnl) || 0; - return pnlB - pnlA; - }); - - // Add trades for each wallet (with current prices) - const walletsWithTrades = await Promise.all( - validWalletPnLs.map(async (wallet) => { - const trades = await getTradesForWallet(wallet.wallet); - // Enhance trades with current prices - const tradesWithPrices = await enhanceTradesWithCurrentPrices(trades); - return { - ...wallet, - trades: tradesWithPrices, - }; - }) + // Enhance trades with current prices + const enhancedWallets = await Promise.all( + walletsWithTrades.map(async wallet => ({ + ...wallet, + trades: await enhanceTradesWithCurrentPrices(wallet.trades), + })) ); res.json({ success: true, - count: validWalletPnLs.length, - wallets: walletsWithTrades, + count: enhancedWallets.length, + wallets: enhancedWallets, }); } catch (error) { console.error('Error in POST /api/wallets/pnl:', error); @@ -1270,7 +1565,15 @@ app.listen(PORT, async () => { // Run once at startup, then every 5 minutes runClosedTradesDetectionCycle(); - // Start trade tracking after server starts + // Start historical trade fetcher (fetches trade history for all wallets) + // Delay to let database initialize first + setTimeout(() => { + startHistoricalTradeFetcher().catch(error => { + console.error('[HISTORICAL FETCHER] Error starting historical trade fetcher:', error); + }); + }, 2000); + + // Start trade tracking after server starts (only collects wallets, not trades) setTimeout(async () => { await startTradeTracking(); }, 1000);