This commit is contained in:
React User
2026-01-18 01:03:36 +00:00
parent 62dad43e26
commit bd53db8cf2
3 changed files with 889 additions and 169 deletions

View File

@@ -51,3 +51,42 @@ ALTER TABLE trades ADD INDEX IF NOT EXISTS idx_direction (direction);
-- Note: PnL is now calculated on-demand from trades table, so the view is no longer needed
-- Trade history table to store historical trades fetched from Hyperliquid API
CREATE TABLE IF NOT EXISTS trade_history (
id INT AUTO_INCREMENT PRIMARY KEY,
wallet_id INT NOT NULL,
wallet_address VARCHAR(66) NOT NULL COMMENT 'Wallet address (normalized to lowercase)',
trade_id VARCHAR(255) NOT NULL COMMENT 'Unique trade ID from Hyperliquid',
coin VARCHAR(20) NOT NULL COMMENT 'Trading pair/coin symbol',
side ENUM('buy', 'sell') NOT NULL COMMENT 'Trade side: buy or sell',
size DECIMAL(30, 8) NOT NULL COMMENT 'Trade size/amount',
price DECIMAL(30, 8) NOT NULL COMMENT 'Execution price (fill price)',
entry_price DECIMAL(30, 8) NULL COMMENT 'Entry price for position (calculated from fills)',
close_price DECIMAL(30, 8) NULL COMMENT 'Close price for position (calculated from fills)',
closed_pnl DECIMAL(30, 8) NULL COMMENT 'Realized PnL from closing this position (from API)',
fee DECIMAL(30, 8) DEFAULT 0 COMMENT 'Fee paid',
timestamp TIMESTAMP NOT NULL COMMENT 'Time trade executed',
order_type VARCHAR(50) NULL COMMENT 'Order type: market, limit, trigger, etc',
status VARCHAR(50) NULL COMMENT 'Order status: filled, cancelled, open, etc',
extra_data JSON NULL COMMENT 'Additional data from API (liquidation, builder fees, etc)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (wallet_id) REFERENCES wallets(id) ON DELETE CASCADE,
UNIQUE KEY unique_trade (wallet_address, trade_id),
INDEX idx_wallet_id (wallet_id),
INDEX idx_wallet_address (wallet_address),
INDEX idx_trade_id (trade_id),
INDEX idx_coin (coin),
INDEX idx_timestamp (timestamp),
INDEX idx_side (side)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Historical trades fetched from Hyperliquid API';
-- Migration: Add entry_price and close_price columns if they don't exist
ALTER TABLE trade_history ADD COLUMN IF NOT EXISTS entry_price DECIMAL(30, 8) NULL COMMENT 'Entry price for position (calculated from fills)' AFTER price;
ALTER TABLE trade_history ADD COLUMN IF NOT EXISTS close_price DECIMAL(30, 8) NULL COMMENT 'Close price for position (calculated from fills)' AFTER entry_price;
ALTER TABLE trade_history ADD COLUMN IF NOT EXISTS closed_pnl DECIMAL(30, 8) NULL COMMENT 'Realized PnL from closing this position (from API)' AFTER close_price;
-- Add column to wallets to track last trade fetch time for rate limiting
ALTER TABLE wallets ADD COLUMN IF NOT EXISTS last_trade_fetch_at TIMESTAMP NULL COMMENT 'Last time historical trades were fetched for this wallet' AFTER last_seen_at;
ALTER TABLE wallets ADD INDEX IF NOT EXISTS idx_last_trade_fetch (last_trade_fetch_at);

View File

@@ -615,7 +615,7 @@ export async function calculatePnLFromTrades(walletAddress: string): Promise<{
}
/**
* Get all trades for a wallet
* Get all trades for a wallet from trade_history table
*/
export async function getTradesForWallet(walletAddress: string): Promise<any[]> {
try {
@@ -626,23 +626,31 @@ export async function getTradesForWallet(walletAddress: string): Promise<any[]>
`SELECT
id,
wallet_address,
entry_hash,
entry_date,
close_date,
trade_id as entry_hash,
timestamp as entry_date,
CASE WHEN close_price IS NOT NULL THEN timestamp ELSE NULL END as close_date,
coin,
amount,
size as amount,
entry_price,
close_price,
direction,
closed_pnl,
side as direction,
price,
fee,
created_at,
updated_at
FROM trades
updated_at,
extra_data
FROM trade_history
WHERE wallet_address = ?
ORDER BY entry_date DESC, created_at DESC`,
ORDER BY timestamp DESC, created_at DESC`,
[normalizedWallet]
);
return rows as any[];
// Parse extra_data JSON if present
return rows.map(row => ({
...row,
extra_data: row.extra_data ? (typeof row.extra_data === 'string' ? JSON.parse(row.extra_data) : row.extra_data) : null,
}));
} catch (error) {
console.error('[DB] Error getting trades for wallet:', error);
return [];
@@ -728,3 +736,373 @@ export async function updateTradeClose(
}
}
/**
* Save trade history to database
*/
export async function saveTradeHistoryToDB(
walletAddress: string,
tradeId: string,
coin: string,
side: 'buy' | 'sell',
size: string | number,
price: string | number,
timestamp: Date | string,
fee?: string | number,
orderType?: string,
status?: string,
extraData?: any,
entryPrice?: string | number | null,
closePrice?: string | number | null,
closedPnl?: string | number | null
): Promise<boolean> {
try {
const normalizedWallet = walletAddress.toLowerCase();
const connectionPool = getPool();
// Get wallet ID
const wallet = await getWalletFromDB(normalizedWallet);
if (!wallet) {
console.error(`[DB] Wallet not found for trade history: ${normalizedWallet}`);
return false;
}
// Insert or update trade history
await connectionPool.execute(
`INSERT INTO trade_history (
wallet_id, wallet_address, trade_id, coin, side, size, price, entry_price, close_price, closed_pnl, fee, timestamp, order_type, status, extra_data
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
coin = VALUES(coin),
side = VALUES(side),
size = VALUES(size),
price = VALUES(price),
entry_price = VALUES(entry_price),
close_price = VALUES(close_price),
closed_pnl = VALUES(closed_pnl),
fee = VALUES(fee),
timestamp = VALUES(timestamp),
order_type = VALUES(order_type),
status = VALUES(status),
extra_data = VALUES(extra_data),
updated_at = CURRENT_TIMESTAMP`,
[
wallet.id,
normalizedWallet,
tradeId,
coin,
side,
size,
price,
entryPrice || null,
closePrice || null,
closedPnl || null,
fee || 0,
timestamp,
orderType || null,
status || null,
extraData ? JSON.stringify(extraData) : null,
]
);
return true;
} catch (error) {
console.error('[DB] Error saving trade history:', error);
return false;
}
}
/**
* Get trade history for a wallet
*/
export async function getTradeHistoryForWallet(
walletAddress: string,
limit?: number
): Promise<any[]> {
try {
const normalizedWallet = walletAddress.toLowerCase();
const connectionPool = getPool();
const query = limit
? `SELECT * FROM trade_history WHERE wallet_address = ? ORDER BY timestamp DESC LIMIT ?`
: `SELECT * FROM trade_history WHERE wallet_address = ? ORDER BY timestamp DESC`;
const [rows] = limit
? await connectionPool.execute<mysql.RowDataPacket[]>(query, [normalizedWallet, limit])
: await connectionPool.execute<mysql.RowDataPacket[]>(query, [normalizedWallet]);
// Parse extra_data JSON if present
return rows.map(row => ({
...row,
extra_data: row.extra_data ? JSON.parse(row.extra_data) : null,
}));
} catch (error) {
console.error('[DB] Error getting trade history for wallet:', error);
return [];
}
}
/**
* Get latest trade timestamp for a wallet (for incremental fetching)
*/
export async function getLatestTradeTimestampForWallet(walletAddress: string): Promise<Date | null> {
try {
const normalizedWallet = walletAddress.toLowerCase();
const connectionPool = getPool();
const [rows] = await connectionPool.execute<mysql.RowDataPacket[]>(
`SELECT MAX(timestamp) as latest_timestamp FROM trade_history WHERE wallet_address = ?`,
[normalizedWallet]
);
if (rows.length > 0 && rows[0].latest_timestamp) {
return new Date(rows[0].latest_timestamp);
}
return null;
} catch (error) {
console.error('[DB] Error getting latest trade timestamp:', error);
return null;
}
}
/**
* Update last trade fetch time for a wallet
*/
export async function updateLastTradeFetchTime(walletAddress: string): Promise<boolean> {
try {
const normalizedWallet = walletAddress.toLowerCase();
const connectionPool = getPool();
await connectionPool.execute(
`UPDATE wallets SET last_trade_fetch_at = NOW() WHERE address = ?`,
[normalizedWallet]
);
return true;
} catch (error) {
console.error('[DB] Error updating last trade fetch time:', error);
return false;
}
}
/**
* Get wallets that need trade history fetching (sorted by priority)
* Priority: wallets that haven't been fetched recently or never fetched
*/
export async function getWalletsNeedingTradeFetch(limit?: number): Promise<string[]> {
try {
const connectionPool = getPool();
const query = limit
? `SELECT address FROM wallets
ORDER BY COALESCE(last_trade_fetch_at, '1970-01-01') ASC, last_seen_at DESC
LIMIT ?`
: `SELECT address FROM wallets
ORDER BY COALESCE(last_trade_fetch_at, '1970-01-01') ASC, last_seen_at DESC`;
const [rows] = limit
? await connectionPool.execute<mysql.RowDataPacket[]>(query, [limit])
: await connectionPool.execute<mysql.RowDataPacket[]>(query);
return rows.map(row => row.address);
} catch (error) {
console.error('[DB] Error getting wallets needing trade fetch:', error);
return [];
}
}
/**
* Get wallets with PnL calculated from stored closed_pnl (optimized SQL query)
* Uses SQL aggregation instead of JavaScript calculation
*/
export async function getWalletsWithPnL(closedOnly: boolean = false): Promise<any[]> {
try {
const connectionPool = getPool();
const query = closedOnly
? `
SELECT
w.address as wallet,
COALESCE(SUM(COALESCE(th.closed_pnl, 0)), 0) as realizedPnl,
COUNT(*) as closedTradesCount,
0 as openTradesCount
FROM wallets w
INNER JOIN trade_history th ON w.address = th.wallet_address AND th.close_price IS NOT NULL
GROUP BY w.address
ORDER BY realizedPnl DESC
`
: `
SELECT
w.address as wallet,
COALESCE(SUM(CASE WHEN th.close_price IS NOT NULL THEN COALESCE(th.closed_pnl, 0) ELSE 0 END), 0) as realizedPnl,
COUNT(CASE WHEN th.close_price IS NOT NULL THEN 1 END) as closedTradesCount,
COUNT(CASE WHEN th.close_price IS NULL THEN 1 END) as openTradesCount
FROM wallets w
LEFT JOIN trade_history th ON w.address = th.wallet_address
GROUP BY w.address
ORDER BY realizedPnl DESC
`;
const [rows] = await connectionPool.execute<mysql.RowDataPacket[]>(query);
return rows.map(row => ({
wallet: row.wallet,
pnl: parseFloat(row.realizedPnl || 0).toFixed(8),
realizedPnl: parseFloat(row.realizedPnl || 0).toFixed(8),
unrealizedPnl: '0.00000000', // Unrealized PnL requires current prices, calculate separately if needed
closedTradesCount: parseInt(row.closedTradesCount || 0),
openTradesCount: parseInt(row.openTradesCount || 0),
}));
} catch (error) {
console.error('[DB] Error getting wallets with PnL:', error);
return [];
}
}
/**
* Get wallets with their trades using SQL JOIN (optimized single query)
* Groups trades per wallet in SQL instead of multiple queries
* @param closedOnly - If true, only return closed trades
* @param walletAddresses - Optional array of wallet addresses to filter by
* @param limit - Maximum number of wallets to return (default: no limit)
*/
export async function getWalletsWithTrades(closedOnly: boolean = false, walletAddresses?: string[], limit?: number): Promise<any[]> {
try {
const connectionPool = getPool();
const walletParams = walletAddresses && walletAddresses.length > 0
? walletAddresses.map(w => w.toLowerCase())
: [];
const walletFilter = walletParams.length > 0
? `WHERE w.address IN (${walletParams.map(() => '?').join(',')})`
: '';
// First get wallets with aggregated PnL
// Always apply limit if provided (default should be 100 from API)
// Ensure limit is a valid number
const validLimit = limit && typeof limit === 'number' && limit > 0 ? limit : undefined;
const limitClause = validLimit ? `LIMIT ?` : '';
const pnlQuery = closedOnly
? `
SELECT
w.address as wallet,
COALESCE(SUM(COALESCE(th.closed_pnl, 0)), 0) as realizedPnl,
COUNT(*) as closedTradesCount,
0 as openTradesCount
FROM wallets w
INNER JOIN trade_history th ON w.address = th.wallet_address AND th.close_price IS NOT NULL
${walletFilter}
GROUP BY w.address
ORDER BY realizedPnl DESC
${limitClause ? limitClause : ''}
`.trim()
: `
SELECT
w.address as wallet,
COALESCE(SUM(CASE WHEN th.close_price IS NOT NULL THEN COALESCE(th.closed_pnl, 0) ELSE 0 END), 0) as realizedPnl,
COUNT(CASE WHEN th.close_price IS NOT NULL THEN 1 END) as closedTradesCount,
COUNT(CASE WHEN th.close_price IS NULL THEN 1 END) as openTradesCount
FROM wallets w
LEFT JOIN trade_history th ON w.address = th.wallet_address
${walletFilter}
GROUP BY w.address
ORDER BY realizedPnl DESC
${limitClause ? limitClause : ''}
`.trim();
// Build query parameters: wallet addresses (if any) + limit (if any)
const queryParams: any[] = [];
if (walletParams.length > 0) {
queryParams.push(...walletParams);
}
if (validLimit) {
queryParams.push(validLimit);
}
console.log(`[DB] getWalletsWithTrades - limit: ${limit}, validLimit: ${validLimit}, limitClause: "${limitClause}", queryParams:`, queryParams);
console.log(`[DB] SQL Query preview: ... ${limitClause || 'NO LIMIT'}`);
const [pnlRows] = await connectionPool.execute<mysql.RowDataPacket[]>(pnlQuery, queryParams);
console.log(`[DB] getWalletsWithTrades - returned ${pnlRows.length} rows from SQL`);
// Apply limit to results if not already applied in SQL (safety check)
// Always apply limit if it was provided, regardless of validLimit check
const finalLimit = limit && typeof limit === 'number' && limit > 0 ? limit : undefined;
const limitedRows = finalLimit && pnlRows.length > finalLimit
? pnlRows.slice(0, finalLimit)
: pnlRows;
console.log(`[DB] getWalletsWithTrades - after limit check: ${limitedRows.length} rows (finalLimit: ${finalLimit})`);
if (limitedRows.length === 0) {
return [];
}
// Get all trades for these wallets in one query
const walletList = limitedRows.map(row => row.wallet.toLowerCase());
if (walletList.length === 0) {
return [];
}
const placeholders = walletList.map(() => '?').join(',');
const closedFilter = closedOnly
? 'AND th.close_price IS NOT NULL'
: '';
const tradesQuery = `
SELECT
th.wallet_address,
th.id,
th.trade_id as entry_hash,
th.timestamp as entry_date,
CASE WHEN th.close_price IS NOT NULL THEN th.timestamp ELSE NULL END as close_date,
th.coin,
th.size as amount,
th.entry_price,
th.close_price,
th.closed_pnl,
th.side as direction,
th.price,
th.fee,
th.created_at,
th.updated_at,
th.extra_data
FROM trade_history th
WHERE th.wallet_address IN (${placeholders}) ${closedFilter}
ORDER BY th.wallet_address, th.timestamp DESC
`;
const [tradeRows] = await connectionPool.execute<mysql.RowDataPacket[]>(tradesQuery, walletList);
// Parse extra_data JSON and group trades by wallet
const tradesByWallet = new Map<string, any[]>();
for (const trade of tradeRows) {
const wallet = trade.wallet_address.toLowerCase();
if (!tradesByWallet.has(wallet)) {
tradesByWallet.set(wallet, []);
}
tradesByWallet.get(wallet)!.push({
...trade,
extra_data: trade.extra_data ? (typeof trade.extra_data === 'string' ? JSON.parse(trade.extra_data) : trade.extra_data) : null,
});
}
// Combine PnL data with trades
return pnlRows.map(row => ({
wallet: row.wallet,
pnl: parseFloat(row.realizedPnl || 0).toFixed(8),
realizedPnl: parseFloat(row.realizedPnl || 0).toFixed(8),
unrealizedPnl: '0.00000000',
closedTradesCount: parseInt(row.closedTradesCount || 0),
openTradesCount: parseInt(row.openTradesCount || 0),
trades: tradesByWallet.get(row.wallet.toLowerCase()) || [],
}));
} catch (error) {
console.error('[DB] Error getting wallets with trades:', error);
return [];
}
}

View File

@@ -34,6 +34,13 @@ import {
getTradesForWallet,
getOpenTrades,
updateTradeClose,
saveTradeHistoryToDB,
getTradeHistoryForWallet,
getLatestTradeTimestampForWallet,
updateLastTradeFetchTime,
getWalletsNeedingTradeFetch,
getWalletsWithPnL,
getWalletsWithTrades,
} from './database';
// Set WebSocket as global for @nktkas/rews to use in Node.js environment
@@ -248,6 +255,7 @@ async function getCurrentPrices(coins: string[]): Promise<Map<string, number>> {
/**
* Enhance trades with current prices and calculate PnL per trade
* Uses the price cache instead of making API calls
* Works with trade_history table structure
*/
async function enhanceTradesWithCurrentPrices(trades: any[]): Promise<any[]> {
if (trades.length === 0) {
@@ -259,12 +267,29 @@ async function enhanceTradesWithCurrentPrices(trades: any[]): Promise<any[]> {
const coinUpper = trade.coin?.toUpperCase() || '';
const currentPrice = coinUpper ? (priceCache.get(coinUpper) || null) : null;
// Check if trade is closed (has close_price)
const isClosed = trade.close_price !== null && trade.close_price !== undefined;
// Calculate PnL per trade
let tradePnl: number | null = null;
if (currentPrice !== null && trade.entry_price) {
// If closed, use closed_pnl from API if available (most accurate)
if (isClosed && trade.closed_pnl !== null && trade.closed_pnl !== undefined) {
tradePnl = parseFloat(String(trade.closed_pnl)) || null;
} else if (isClosed && trade.close_price && trade.entry_price) {
// Fallback: calculate from entry/close prices for closed trades
const entryPrice = parseFloat(String(trade.entry_price)) || 0;
const amount = parseFloat(String(trade.amount)) || 0;
const direction = trade.direction || 'buy';
const closePrice = parseFloat(String(trade.close_price)) || 0;
const amount = parseFloat(String(trade.amount || trade.size)) || 0;
if (entryPrice > 0 && closePrice > 0 && amount > 0) {
tradePnl = (closePrice - entryPrice) * amount;
}
} else if (!isClosed && currentPrice !== null && trade.entry_price) {
// For open trades, calculate unrealized PnL using current price
const entryPrice = parseFloat(String(trade.entry_price)) || 0;
const amount = parseFloat(String(trade.amount || trade.size)) || 0;
const direction = trade.direction || trade.side || 'buy';
if (entryPrice > 0 && amount > 0 && currentPrice > 0) {
if (direction === 'buy') {
@@ -584,9 +609,9 @@ async function getWalletPnL(wallet: string, closedOnly: boolean = false): Promis
// Get all trades for this wallet
const trades = await getTradesForWallet(wallet);
// Filter trades if closedOnly is true
// Filter trades if closedOnly is true (check close_price instead of close_date)
const filteredTrades = closedOnly
? trades.filter((trade: any) => trade.close_date !== null)
? trades.filter((trade: any) => trade.close_price !== null && trade.close_price !== undefined)
: trades;
let realizedPnl = 0;
@@ -615,14 +640,23 @@ async function getWalletPnL(wallet: string, closedOnly: boolean = false): Promis
// Calculate PnL from trades
for (const trade of filteredTrades) {
const amount = parseFloat(trade.amount) || 0;
const amount = parseFloat(trade.amount || trade.size) || 0;
const entryPrice = parseFloat(trade.entry_price) || 0;
if (trade.close_date && trade.close_price) {
// Closed trade - calculate realized PnL: (close_price - entry_price) * amount
const closePrice = parseFloat(trade.close_price) || 0;
const tradePnl = (closePrice - entryPrice) * amount;
realizedPnl += tradePnl;
// Check if trade is closed (close_price IS NOT NULL or closed_pnl IS NOT NULL)
const isClosed = trade.close_price !== null && trade.close_price !== undefined;
if (isClosed) {
// Closed trade - use closed_pnl from API if available, otherwise calculate
if (trade.closed_pnl !== null && trade.closed_pnl !== undefined) {
// Use closed_pnl from API (most accurate)
realizedPnl += parseFloat(trade.closed_pnl) || 0;
} else if (trade.close_price && entryPrice > 0) {
// Fallback: calculate from entry/close prices
const closePrice = parseFloat(trade.close_price) || 0;
const tradePnl = (closePrice - entryPrice) * amount;
realizedPnl += tradePnl;
}
closedTradesCount++;
} else if (!closedOnly) {
// Open trade - calculate unrealized PnL using current price (only if not closedOnly)
@@ -763,6 +797,342 @@ async function fetchAndUpdateAllPnL(): Promise<(WalletPnL | null)[]> {
return walletPnLs;
}
/**
* Fetch historical trade history for a wallet from Hyperliquid API
*/
async function fetchHistoricalTradesForWallet(walletAddress: string): Promise<number> {
try {
if (!walletAddress || walletAddress.length !== 42 || !walletAddress.startsWith('0x')) {
return 0;
}
console.log(`[HISTORICAL] Fetching trade history for wallet: ${walletAddress.substring(0, 10)}...`);
// Get latest trade timestamp to fetch only new trades (incremental fetching)
const latestTimestamp = await getLatestTradeTimestampForWallet(walletAddress);
const startTime = latestTimestamp ? Math.floor(latestTimestamp.getTime() / 1000) : undefined;
// Try to fetch user fills/historical trades from Hyperliquid API
// Note: The exact method name may vary - adjust based on @nktkas/hyperliquid SDK
let fills: any[] = [];
try {
// Try userFills method (common in Hyperliquid API)
if ((client as any).userFills) {
const response = await (client as any).userFills({ user: walletAddress, startTime });
fills = Array.isArray(response) ? response : response?.fills || response?.data || [];
}
// Try userFillsByTime if available
else if ((client as any).userFillsByTime) {
const response = await (client as any).userFillsByTime({ user: walletAddress, startTime });
fills = Array.isArray(response) ? response : response?.fills || response?.data || [];
}
// Try historicalOrders if available
else if ((client as any).historicalOrders) {
const response = await (client as any).historicalOrders({ user: walletAddress, startTime });
fills = Array.isArray(response) ? response : response?.orders || response?.fills || response?.data || [];
}
// If none of the methods are available, log error
else {
console.error(`[HISTORICAL] No suitable method found on InfoClient to fetch user fills. Available methods may vary.`);
console.error(`[HISTORICAL] Tried: userFills, userFillsByTime, historicalOrders`);
return 0;
}
} catch (apiError: any) {
// If it's a rate limit error, log and return
if (apiError?.message?.includes('rate limit') || apiError?.status === 429) {
console.log(`[HISTORICAL] Rate limit hit for wallet ${walletAddress.substring(0, 10)}...`);
throw apiError; // Re-throw to let caller handle rate limiting
}
console.error(`[HISTORICAL] Error fetching trades for ${walletAddress.substring(0, 10)}...:`, apiError?.message || apiError);
return 0;
}
if (!Array.isArray(fills) || fills.length === 0) {
console.log(`[HISTORICAL] No new trades found for wallet: ${walletAddress.substring(0, 10)}...`);
await updateLastTradeFetchTime(walletAddress);
return 0;
}
// Get existing trade history to calculate current positions
const existingTrades = await getTradeHistoryForWallet(walletAddress);
// Track positions per coin: { coin: { size: number, entryPrice: number } }
const positions = new Map<string, { size: number; entryPrice: number; entryValue: number }>();
// Process existing trades to calculate current positions
for (const trade of existingTrades) {
const coin = trade.coin.toUpperCase();
const size = parseFloat(trade.size) || 0;
const fillPrice = parseFloat(trade.price) || 0;
if (!positions.has(coin)) {
positions.set(coin, { size: 0, entryPrice: 0, entryValue: 0 });
}
const position = positions.get(coin)!;
const absSize = Math.abs(size);
if (trade.side === 'buy') {
// Adding to position
const newSize = position.size + absSize;
const newValue = position.entryValue + (absSize * fillPrice);
position.size = newSize;
position.entryValue = newValue;
position.entryPrice = newSize > 0 ? newValue / newSize : 0;
} else if (trade.side === 'sell') {
// Reducing position
if (absSize >= position.size) {
// Fully closing position
position.size = 0;
position.entryPrice = 0;
position.entryValue = 0;
} else {
// Partially closing position
position.size = position.size - absSize;
position.entryValue = position.size * position.entryPrice;
}
}
}
// Process and save each new trade with position tracking
let savedCount = 0;
for (const fill of fills) {
try {
// Extract trade data - adjust fields based on Hyperliquid API response format
const tradeId = fill.id || fill.oid || fill.tradeId || `${fill.time || Date.now()}-${fill.px || ''}-${fill.sz || ''}`;
const coin = fill.coin || fill.symbol || fill.market || '';
// Fix side detection: 'B' or 'b' = buy (Bid), 'A' or 'a' = sell (Ask)
let side: 'buy' | 'sell' = 'buy'; // default
if (fill.side === 'B' || fill.side === 'b') {
side = 'buy';
} else if (fill.side === 'A' || fill.side === 'a') {
side = 'sell';
} else if (fill.isLong === true || fill.isLong === false) {
// If isLong is explicitly set, use it
side = fill.isLong ? 'buy' : 'sell';
} else {
// Fallback: check if closedPnl suggests a close (but don't use it for side)
// Try to infer from size or other fields
const sizeNum = parseFloat(fill.sz || fill.size || '0');
// If size is negative, it might indicate sell
if (sizeNum < 0) {
side = 'sell';
}
}
const size = fill.sz || fill.size || fill.amount || '0';
const price = fill.px || fill.price || '0';
const fee = fill.fee || fill.commission || '0';
const timestamp = fill.time ? new Date(fill.time > 1e12 ? fill.time : fill.time * 1000) : new Date();
const orderType = fill.orderType || fill.type || null;
const status = fill.status || 'filled';
const coinUpper = coin.toUpperCase();
const sizeNum = Math.abs(parseFloat(size));
const priceNum = parseFloat(price);
// Extract API fields for position tracking
const closedPnl = fill.closedPnl ? parseFloat(fill.closedPnl) : null;
const dir = fill.dir || ''; // e.g., "Close Long"
const startPosition = fill.startPosition ? parseFloat(fill.startPosition) : null;
const isClosing = dir.toLowerCase().includes('close');
// Track positions and calculate entry/close prices
if (!positions.has(coinUpper)) {
positions.set(coinUpper, { size: 0, entryPrice: 0, entryValue: 0 });
}
const position = positions.get(coinUpper)!;
let entryPrice: number | null = null;
let closePrice: number | null = null;
// If API provides closedPnl, use it to calculate entry_price
if (isClosing && closedPnl !== null && closedPnl !== undefined && sizeNum > 0) {
// This is a closing trade with PnL
closePrice = priceNum;
// Calculate entry_price from closedPnl
// For closing long: PnL = (close_price - entry_price) * size
// So: entry_price = close_price - (closedPnl / size)
if (dir.toLowerCase().includes('long')) {
entryPrice = priceNum - (closedPnl / sizeNum);
} else if (dir.toLowerCase().includes('short')) {
// For closing short: PnL = (entry_price - close_price) * size
// So: entry_price = close_price + (closedPnl / size)
entryPrice = priceNum + (closedPnl / sizeNum);
} else {
// Default to long calculation
entryPrice = priceNum - (closedPnl / sizeNum);
}
// Update position tracking
if (startPosition !== null && startPosition !== undefined) {
// Use startPosition to update position
if (sizeNum >= position.size) {
// Fully closing position
position.size = 0;
position.entryPrice = 0;
position.entryValue = 0;
} else {
// Partially closing position
position.size = position.size - sizeNum;
position.entryValue = position.size * position.entryPrice;
}
} else if (sizeNum >= position.size) {
// Fully closing position
position.size = 0;
position.entryPrice = 0;
position.entryValue = 0;
} else {
// Partially closing position
position.size = position.size - sizeNum;
position.entryValue = position.size * position.entryPrice;
}
} else if (side === 'buy') {
// Opening or adding to position
const wasZero = position.size === 0;
const newSize = position.size + sizeNum;
const newValue = position.entryValue + (sizeNum * priceNum);
// If opening a new position (was zero), this is the entry price
if (wasZero) {
entryPrice = priceNum;
} else {
// Use weighted average entry price
entryPrice = newValue / newSize;
}
position.size = newSize;
position.entryValue = newValue;
position.entryPrice = entryPrice;
} else if (side === 'sell') {
// Closing or reducing position
if (position.size > 0) {
// Position exists, this is closing (partially or fully)
closePrice = priceNum;
// Use the current position's entry price for PnL calculation
entryPrice = position.entryPrice > 0 ? position.entryPrice : null;
if (sizeNum >= position.size) {
// Fully closing position
position.size = 0;
position.entryPrice = 0;
position.entryValue = 0;
} else {
// Partially closing position
position.size = position.size - sizeNum;
position.entryValue = position.size * position.entryPrice;
}
} else {
// No position, this might be a short or closing a short (we don't track shorts)
// For now, just use the fill price
closePrice = null;
entryPrice = null;
}
}
// Save trade history with entry/close prices and closedPnl
const saved = await saveTradeHistoryToDB(
walletAddress,
String(tradeId),
coinUpper,
side,
size,
price,
timestamp,
fee,
orderType,
status,
fill, // Store full fill data in extra_data
entryPrice ? String(entryPrice) : null,
closePrice ? String(closePrice) : null,
closedPnl !== null && closedPnl !== undefined ? String(closedPnl) : null
);
if (saved) {
savedCount++;
}
} catch (tradeError) {
console.error(`[HISTORICAL] Error saving trade for ${walletAddress.substring(0, 10)}...:`, tradeError);
}
}
// Update last fetch time
await updateLastTradeFetchTime(walletAddress);
console.log(`[HISTORICAL] ✅ Fetched and saved ${savedCount} trades for wallet: ${walletAddress.substring(0, 10)}...`);
return savedCount;
} catch (error: any) {
console.error(`[HISTORICAL] Error fetching historical trades for ${walletAddress.substring(0, 10)}...:`, error?.message || error);
// Don't update last fetch time on error (so we can retry)
return 0;
}
}
/**
* Rate-limited background job to fetch historical trades for all wallets
* Respects rate limits by processing wallets sequentially with delays
*/
async function startHistoricalTradeFetcher(): Promise<void> {
console.log('[HISTORICAL FETCHER] Starting historical trade fetcher...');
// Rate limiting configuration
const RATE_LIMIT_REQUESTS_PER_MINUTE = 20; // Conservative limit
const RATE_LIMIT_DELAY_MS = Math.ceil(60000 / RATE_LIMIT_REQUESTS_PER_MINUTE); // Delay between requests
const BATCH_SIZE = 10; // Process wallets in batches
async function fetchBatch(): Promise<void> {
try {
// Get wallets that need fetching (prioritized by last fetch time)
const walletsToFetch = await getWalletsNeedingTradeFetch(BATCH_SIZE);
if (walletsToFetch.length === 0) {
console.log('[HISTORICAL FETCHER] No wallets need fetching right now');
return;
}
console.log(`[HISTORICAL FETCHER] Processing batch of ${walletsToFetch.length} wallets...`);
// Process wallets sequentially with rate limiting
for (const wallet of walletsToFetch) {
try {
await fetchHistoricalTradesForWallet(wallet);
// Wait between requests to respect rate limits
await new Promise(resolve => setTimeout(resolve, RATE_LIMIT_DELAY_MS));
} catch (error: any) {
// If rate limited, wait longer before continuing
if (error?.message?.includes('rate limit') || error?.status === 429) {
console.log('[HISTORICAL FETCHER] Rate limit hit, waiting 60 seconds...');
await new Promise(resolve => setTimeout(resolve, 60000)); // Wait 1 minute
} else {
console.error(`[HISTORICAL FETCHER] Error fetching for ${wallet.substring(0, 10)}...:`, error?.message || error);
}
}
}
console.log(`[HISTORICAL FETCHER] ✅ Batch complete`);
} catch (error) {
console.error('[HISTORICAL FETCHER] Error in fetch batch:', error);
}
}
// Run initial fetch
fetchBatch();
// Then run every 5 minutes
const FETCH_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
setInterval(() => {
fetchBatch().catch(error => {
console.error('[HISTORICAL FETCHER] Error in scheduled fetch:', error);
});
}, FETCH_INTERVAL_MS);
console.log(`[HISTORICAL FETCHER] ✅ Historical trade fetcher started (runs every 5 minutes)`);
}
/**
* Check if a trade event represents a closing event (position closure)
* This function determines if a trade is closing a position
@@ -802,12 +1172,12 @@ function isOpeningEvent(tradeData: any): boolean {
}
/**
* Start tracking trade events (both opening and closing) to automatically collect wallet addresses
* Listens to all trade events and processes both opens and closes
* Start tracking trade events to automatically collect wallet addresses
* Only stores wallets - trades will be fetched via historical API
*/
async function startTradeTracking(): Promise<void> {
try {
console.log('[TRACKING] Starting trade tracking (opens and closes)...');
console.log('[TRACKING] Starting wallet tracking (trades will be fetched via historical API)...');
console.log('[TRACKING] WebSocket transport:', wsTransport);
// Wait for WebSocket connection to be ready
@@ -825,13 +1195,13 @@ async function startTradeTracking(): Promise<void> {
throw error;
}
// Subscribe to all trades and process both opens and closes
console.log(`[TRACKING] Subscribing to all trade events (opens and closes)...`);
// Subscribe to all trades to collect wallet addresses only
console.log(`[TRACKING] Subscribing to trade events to collect wallet addresses...`);
try {
console.log(`[TRACKING] Attempting to subscribe to all trade events...`);
console.log(`[TRACKING] Attempting to subscribe to trade events...`);
// Subscribe to trades for multiple coins - process both opens and closes
// Subscribe to trades for multiple coins - only collect wallet addresses
const coinsToTrack = ['ETH', 'BTC', 'SOL', 'ARB', 'AVAX', 'BNB', 'MATIC', 'ADA', 'DOGE', 'XRP'];
console.log(`[TRACKING] Subscribing to trades for: ${coinsToTrack.join(', ')}`);
@@ -895,28 +1265,24 @@ async function startTradeTracking(): Promise<void> {
const uniqueWallets = [...new Set(foundWallets.map(w => w.toLowerCase()))];
if (uniqueWallets.length === 0) {
console.log(`[TRADE] ⚠️ No wallet addresses found in trade event`);
return;
return; // No wallet addresses found, skip
}
// Process wallets in parallel - save all trades as buy/sell
// Only add wallets to tracking - trades will be fetched via historical API
Promise.all(uniqueWallets.map(async (w) => {
try {
// Add wallet to tracking (both in-memory and database)
await addWallet(w);
// Process and save trade (buy or sell)
await processAndSaveTrade(t, coin, w);
} catch (error) {
console.error(`[TRADE] Error processing wallet ${w}:`, error);
console.error(`[TRADE] Error adding wallet ${w}:`, error);
}
})).catch(error => {
console.error('[TRADE] Error in Promise.all for wallets:', error);
});
});
} catch (error) {
console.error('[TRADE CLOSE] Error processing closing event:', error);
console.error('[TRADE CLOSE] Error stack:', (error as Error).stack);
console.error('[TRADE] Error processing trade event:', error);
console.error('[TRADE] Error stack:', (error as Error).stack);
}
});
@@ -927,21 +1293,21 @@ async function startTradeTracking(): Promise<void> {
}
}
console.log('[TRACKING] ✅ Trade tracking initialization complete (buys and sells)');
console.log('[TRACKING] ✅ Wallet tracking initialization complete');
console.log(`[TRACKING] Currently tracking ${trackedWallets.size} wallets`);
} catch (error) {
console.error('[TRACKING] ❌ Fatal error starting trade tracking:', error);
console.error('[TRACKING] ❌ Fatal error starting wallet tracking:', error);
console.error('[TRACKING] Error stack:', (error as Error).stack);
}
} catch (error) {
console.error('[TRACKING] ❌ Fatal error starting trade tracking:', error);
console.error('[TRACKING] ❌ Fatal error starting wallet tracking:', error);
console.error('[TRACKING] Error stack:', (error as Error).stack);
}
}
/**
* API endpoint to get tracked wallets sorted by PnL
* No input required - uses automatically tracked wallets
* Optimized: Uses SQL JOINs and stored PnL instead of calculating in JavaScript
*/
app.get('/api/wallets/tracked', async (req: Request, res: Response) => {
try {
@@ -950,9 +1316,20 @@ app.get('/api/wallets/tracked', async (req: Request, res: Response) => {
// Check for closedOnly query parameter
const closedOnly = req.query.closedOnly === 'true' || req.query.closedOnly === '1';
// Get all wallets
const wallets = await loadWalletsFromDB();
if (wallets.length === 0) {
// Get limit parameter (default: 100, max: 1000)
const limitParam = req.query.limit as string;
let limit: number = 100; // default to 100
if (limitParam) {
const parsedLimit = parseInt(limitParam, 10);
if (!isNaN(parsedLimit) && parsedLimit > 0) {
limit = Math.min(parsedLimit, 1000); // cap at 1000
}
}
// Get wallets with PnL and trades using optimized SQL query (single query with JOINs)
const walletsWithTrades = await getWalletsWithTrades(closedOnly, undefined, limit);
if (walletsWithTrades.length === 0) {
return res.json({
success: true,
count: 0,
@@ -961,51 +1338,20 @@ app.get('/api/wallets/tracked', async (req: Request, res: Response) => {
});
}
// Calculate PnL for each wallet (with closedOnly filter if specified)
console.log(`[PnL] Calculating PnL from trades for ${wallets.length} wallets${closedOnly ? ' (closed trades only)' : ''}`);
let completed = 0;
const total = wallets.length;
const walletPnLs = await Promise.all(
wallets.map(async (wallet) => {
const pnlData = await getWalletPnL(wallet, closedOnly);
completed++;
console.log(`[PnL] Progress: ${completed}/${total} - ${wallet.substring(0, 14)}...`);
return pnlData;
})
);
const validWalletPnLs = walletPnLs.filter((w): w is WalletPnL => w !== null);
validWalletPnLs.sort((a, b) => {
const pnlA = parseFloat(a.pnl) || 0;
const pnlB = parseFloat(b.pnl) || 0;
return pnlB - pnlA;
});
// Add trades for each wallet (with current prices)
const walletsWithTrades = await Promise.all(
validWalletPnLs.map(async (wallet) => {
const trades = await getTradesForWallet(wallet.wallet);
// Filter out open trades if closedOnly is true
const filteredTrades = closedOnly
? trades.filter((trade: any) => trade.close_date !== null)
: trades;
// Enhance trades with current prices
const tradesWithPrices = await enhanceTradesWithCurrentPrices(filteredTrades);
return {
...wallet,
trades: tradesWithPrices,
};
})
// Enhance trades with current prices (only for display, PnL already calculated from stored closed_pnl)
const enhancedWallets = await Promise.all(
walletsWithTrades.map(async wallet => ({
...wallet,
trades: await enhanceTradesWithCurrentPrices(wallet.trades),
}))
);
res.json({
success: true,
count: validWalletPnLs.length,
totalTracked: wallets.length,
count: enhancedWallets.length,
totalTracked: enhancedWallets.length,
closedOnly: closedOnly,
wallets: walletsWithTrades,
wallets: enhancedWallets,
});
} catch (error) {
console.error('Error in /api/wallets/tracked:', error);
@@ -1061,28 +1407,25 @@ app.post('/api/wallets/track', async (req: Request, res: Response) => {
/**
* API endpoint to get list of tracked wallet addresses (without PnL)
* Optimized: Uses SQL JOIN to get wallets with trades in single query
*/
app.get('/api/wallets/list', async (req: Request, res: Response) => {
try {
const wallets = await loadWalletsFromDB();
// Get wallets with trades using optimized SQL query
const walletsWithTrades = await getWalletsWithTrades(false);
// Add trades for each wallet (with current prices)
const walletsWithTrades = await Promise.all(
wallets.map(async (wallet) => {
const trades = await getTradesForWallet(wallet);
// Enhance trades with current prices
const tradesWithPrices = await enhanceTradesWithCurrentPrices(trades);
return {
address: wallet,
trades: tradesWithPrices,
};
})
// Format response (remove PnL data, keep only address and trades)
const enhancedWallets = await Promise.all(
walletsWithTrades.map(async wallet => ({
address: wallet.wallet,
trades: await enhanceTradesWithCurrentPrices(wallet.trades),
}))
);
res.json({
success: true,
count: wallets.length,
wallets: walletsWithTrades,
count: enhancedWallets.length,
wallets: enhancedWallets,
});
} catch (error) {
console.error('Error in GET /api/wallets/list:', error);
@@ -1115,47 +1458,23 @@ app.get('/api/wallets/pnl', async (req: Request, res: Response) => {
return res.status(400).json({ error: 'No valid wallet addresses provided.' });
}
console.log(`[PnL] Processing ${wallets.length} wallets for PnL calculation`);
let completed = 0;
const total = wallets.length;
console.log(`[PnL] Processing ${wallets.length} wallets using optimized SQL query`);
// Fetch PnL for all wallets in parallel
const walletPnLs = await Promise.all(
wallets.map(async (wallet) => {
const pnlData = await getWalletPnL(wallet);
completed++;
console.log(`[PnL] Progress: ${completed}/${total} - ${wallet.substring(0, 14)}...`);
return pnlData;
})
);
// Get wallets with PnL and trades using optimized SQL query (single query with JOINs)
const walletsWithTrades = await getWalletsWithTrades(false, wallets);
// Filter out null results (failed requests)
const validWalletPnLs = walletPnLs.filter((w): w is WalletPnL => w !== null);
// Sort by PnL (highest first) - convert string to number for sorting
validWalletPnLs.sort((a, b) => {
const pnlA = parseFloat(a.pnl) || 0;
const pnlB = parseFloat(b.pnl) || 0;
return pnlB - pnlA;
});
// Add trades for each wallet (with current prices)
const walletsWithTrades = await Promise.all(
validWalletPnLs.map(async (wallet) => {
const trades = await getTradesForWallet(wallet.wallet);
// Enhance trades with current prices
const tradesWithPrices = await enhanceTradesWithCurrentPrices(trades);
return {
...wallet,
trades: tradesWithPrices,
};
})
// Enhance trades with current prices
const enhancedWallets = await Promise.all(
walletsWithTrades.map(async wallet => ({
...wallet,
trades: await enhanceTradesWithCurrentPrices(wallet.trades),
}))
);
res.json({
success: true,
count: validWalletPnLs.length,
wallets: walletsWithTrades,
count: enhancedWallets.length,
wallets: enhancedWallets,
});
} catch (error) {
console.error('Error in GET /api/wallets/pnl:', error);
@@ -1176,47 +1495,23 @@ app.post('/api/wallets/pnl', async (req: Request, res: Response) => {
});
}
console.log(`[PnL] Processing ${wallets.length} wallets for PnL calculation`);
let completed = 0;
const total = wallets.length;
console.log(`[PnL] Processing ${wallets.length} wallets using optimized SQL query`);
// Fetch PnL for all wallets in parallel
const walletPnLs = await Promise.all(
wallets.map(async (wallet: string) => {
const pnlData = await getWalletPnL(wallet);
completed++;
console.log(`[PnL] Progress: ${completed}/${total} - ${wallet.substring(0, 14)}...`);
return pnlData;
})
);
// Get wallets with PnL and trades using optimized SQL query (single query with JOINs)
const walletsWithTrades = await getWalletsWithTrades(false, wallets);
// Filter out null results (failed requests)
const validWalletPnLs = walletPnLs.filter((w): w is WalletPnL => w !== null);
// Sort by PnL (highest first) - convert string to number for sorting
validWalletPnLs.sort((a, b) => {
const pnlA = parseFloat(a.pnl) || 0;
const pnlB = parseFloat(b.pnl) || 0;
return pnlB - pnlA;
});
// Add trades for each wallet (with current prices)
const walletsWithTrades = await Promise.all(
validWalletPnLs.map(async (wallet) => {
const trades = await getTradesForWallet(wallet.wallet);
// Enhance trades with current prices
const tradesWithPrices = await enhanceTradesWithCurrentPrices(trades);
return {
...wallet,
trades: tradesWithPrices,
};
})
// Enhance trades with current prices
const enhancedWallets = await Promise.all(
walletsWithTrades.map(async wallet => ({
...wallet,
trades: await enhanceTradesWithCurrentPrices(wallet.trades),
}))
);
res.json({
success: true,
count: validWalletPnLs.length,
wallets: walletsWithTrades,
count: enhancedWallets.length,
wallets: enhancedWallets,
});
} catch (error) {
console.error('Error in POST /api/wallets/pnl:', error);
@@ -1270,7 +1565,15 @@ app.listen(PORT, async () => {
// Run once at startup, then every 5 minutes
runClosedTradesDetectionCycle();
// Start trade tracking after server starts
// Start historical trade fetcher (fetches trade history for all wallets)
// Delay to let database initialize first
setTimeout(() => {
startHistoricalTradeFetcher().catch(error => {
console.error('[HISTORICAL FETCHER] Error starting historical trade fetcher:', error);
});
}, 2000);
// Start trade tracking after server starts (only collects wallets, not trades)
setTimeout(async () => {
await startTradeTracking();
}, 1000);