DATA/UPBIT/daemon/stats/daemon_upbit_stats_direction.php
#!/usr/bin/php
<?php
// ============================================================
// stats_direction_01 업데이트 데몬 (완전 수정 버전)
// - 트랜잭션 기반 안전한 처리
// - 마지막 처리 ID 영속 저장 (재시작 시 이어서 처리)
// - 부분 실패 시 안전한 롤백 및 재시도
// ============================================================

set_time_limit(0);
ini_set('memory_limit', '-1');
date_default_timezone_set('Asia/Seoul');

if (php_sapi_name() !== 'cli') { 
    fwrite(STDERR, "[ERROR] CLI only\n"); 
    exit(1); 
}

$DAEMON_ID = pathinfo(__FILE__, PATHINFO_FILENAME);
$PID = getmypid();
$last_heartbeat_time = 0;

$keepRunning = true;
if (function_exists('pcntl_signal') && defined('SIGTERM') && defined('SIGINT')) {
    pcntl_signal(SIGTERM, function() use (&$keepRunning) { $keepRunning = false; });
    pcntl_signal(SIGINT,  function() use (&$keepRunning) { $keepRunning = false; });
}

// ============================================================
// 로깅
// ============================================================
function log_message($msg, $is_error = false) {
    $timestamp = date('Y-m-d H:i:s');
    $output = "[{$timestamp}] {$msg}\n";
    if ($is_error) {
        fwrite(STDERR, $output);
    } else {
        echo $output;
    }
    flush();
}

// ============================================================
// 데몬 상태 기록 (d_last_id 포함)
// ============================================================
function update_daemon_status($db, $id, $pid, $status, $last_id = null) {
    try {
        $sql = "INSERT INTO upbit_data.daemon_record 
                (d_id, d_category, d_pid, d_status, d_heartbeat, d_start_time, d_last_id)
                VALUES (:id, 'STATS_DIRECTION_01', :pid, :status, NOW(), NOW(), :last_id)
                ON DUPLICATE KEY UPDATE
                d_pid = :pid2, 
                d_status = :status2, 
                d_heartbeat = NOW(),
                d_last_id = COALESCE(:last_id2, d_last_id)";
        
        $stmt = $db->prepare($sql);
        $stmt->execute([
            ':id'       => $id,
            ':pid'      => $pid,
            ':pid2'     => $pid,
            ':status'   => $status,
            ':status2'  => $status,
            ':last_id'  => $last_id,
            ':last_id2' => $last_id
        ]);
    } catch (Exception $e) {
        log_message("Daemon Record 업데이트 실패: " . $e->getMessage(), true);
    }
}

// ============================================================
// 마지막 처리 ID 조회
// ============================================================
function get_last_processed_id($db, $daemon_id) {
    try {
        $stmt = $db->prepare("SELECT d_last_id FROM upbit_data.daemon_record WHERE d_id = ?");
        $stmt->execute([$daemon_id]);
        $result = $stmt->fetchColumn();
        return $result ? (int)$result : 0;
    } catch (Exception $e) {
        log_message("Last ID 조회 실패: " . $e->getMessage(), true);
        return 0;
    }
}

// ============================================================
// DB 연결
// ============================================================
function loadDBConnection() {
    if (file_exists('/home/www/DB/db_upbit.php')) {
        include '/home/www/DB/db_upbit.php';
        $db = null;
        
        if (isset($db_upbit) && $db_upbit instanceof PDO) {
            $db = $db_upbit;
        } elseif (isset($pdo) && $pdo instanceof PDO) {
            $db = $pdo;
        }
        
        if ($db === null) return null;
        
        $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $db->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
        return $db;
    }
    return null;
}

// ============================================================
// 메인 루프
// ============================================================
$db_upbit = null;
$last_processed_id = 0;
$is_initialized = false;

log_message("데몬 시작 (ID: {$DAEMON_ID}, PID: {$PID})");

while ($keepRunning) {
    try {
        // DB 연결
        if ($db_upbit === null) {
            $db_upbit = loadDBConnection();
            if ($db_upbit === null) {
                log_message("DB 연결 실패, 5초 후 재시도", true);
                sleep(5);
                continue;
            }
            $is_initialized = false;
            log_message("DB 연결 성공");
        }

        // 하트비트 (1초 주기)
        if (time() - $last_heartbeat_time >= 1) {
            update_daemon_status($db_upbit, $DAEMON_ID, $PID, 'RUNNING', $last_processed_id);
            $last_heartbeat_time = time();
        }

        // 초기화: 마지막 처리 ID 복구
        if (!$is_initialized) {
            $last_processed_id = get_last_processed_id($db_upbit, $DAEMON_ID);
            $is_initialized = true;
            log_message("마지막 처리 ID 복구: {$last_processed_id}");
        }

        // 대상 마켓 로드
        $target_markets = $db_upbit->query("SELECT market FROM stats_direction_01")
                                   ->fetchAll(PDO::FETCH_COLUMN);

        if (empty($target_markets)) {
            usleep(500000);
            continue;
        }

        // 원장 데이터 조회 및 처리
        $processed_count = 0;
        
        while (true) {
            $placeholders = implode(',', array_fill(0, count($target_markets), '?'));
            
            // [수정된 부분] 플랫폼(1초봉) 테이블의 고유키인 id로 정확히 타격
            $sql = "SELECT id, market, trade_price, change_rate, tr_ask_bid, tr_trade_volume
                    FROM daemon_upbit_coin_1s 
                    WHERE id > ? AND market IN ($placeholders)
                    ORDER BY id ASC 
                    LIMIT 100";

            $stmt = $db_upbit->prepare($sql);
            $stmt->execute(array_merge([$last_processed_id], $target_markets));
            $rows = $stmt->fetchAll(PDO::FETCH_ASSOC);

            if (empty($rows)) break;

            // 각 레코드를 개별 트랜잭션으로 처리
            foreach ($rows as $row) {
                // [수정된 부분] 받아오는 배열 키값도 id로 수정
                $current_id = $row['id'];
                $market = $row['market'];
                
                try {
                    $db_upbit->beginTransaction();

                    // 1. 현재 통계 조회 (FOR UPDATE)
                    $s_sql = "SELECT change_rate, chg_dir_sum, chg_up_streak, chg_down_streak, 
                                     chg_up_streak_max, chg_down_streak_max, chg_rate_avg, chg_rate_avg_cnt,
                                     side_plus, side_minus, side_total, side_total_cnt, 
                                     acc_bid_volume, acc_ask_volume, side_avg 
                              FROM stats_direction_01 
                              WHERE market = ? 
                              FOR UPDATE";
                    
                    $s_stmt = $db_upbit->prepare($s_sql);
                    $s_stmt->execute([$market]);
                    $old = $s_stmt->fetch();

                    if (!$old) {
                        $db_upbit->rollBack();
                        log_message("마켓 {$market} 통계 레코드 없음, 스킵 (ID: {$current_id})", true);
                        $last_processed_id = $current_id; // 스킵한 ID도 진행
                        continue;
                    }

                    // 2. 계산
                    $price    = (double)$row['trade_price'];
                    $rate_now = (double)$row['change_rate'];
                    $side     = $row['tr_ask_bid'];
                    $dir      = ($side === 'BID') ? 1 : -1;
                    $vol      = (double)$row['tr_trade_volume'];
                    $old_rate = (double)($old['change_rate'] ?? 0);

                    // 등락률 통계
                    $chg_dir = ($rate_now > $old_rate ? 1 : ($rate_now < $old_rate ? -1 : 0));
                    $chg_dir_sum = (int)($old['chg_dir_sum'] ?? 0) + $chg_dir;
                    $chg_up = ($chg_dir == 1 ? (int)($old['chg_up_streak'] ?? 0) + 1 : 0);
                    $chg_down = ($chg_dir == -1 ? (int)($old['chg_down_streak'] ?? 0) + 1 : 0);
                    $chg_up_max = max((int)($old['chg_up_streak_max'] ?? 0), $chg_up);
                    $chg_down_max = max((int)($old['chg_down_streak_max'] ?? 0), $chg_down);

                    $old_chg_avg_cnt = (int)($old['chg_rate_avg_cnt'] ?? 0);
                    $new_chg_avg_cnt = $old_chg_avg_cnt + 1;
                    $old_chg_avg = (double)($old['chg_rate_avg'] ?? $rate_now);
                    $new_chg_avg = (($old_chg_avg * $old_chg_avg_cnt) + $rate_now) / $new_chg_avg_cnt;

                    // 방향 및 거래량 통계
                    $side_plus = (int)($old['side_plus'] ?? 0) + ($dir == 1 ? 1 : 0);
                    $side_minus = (int)($old['side_minus'] ?? 0) + ($dir == -1 ? 1 : 0);
                    $side_total = (int)($old['side_total'] ?? 0) + $dir;

                    $old_side_total_cnt = (int)($old['side_total_cnt'] ?? 0);
                    $new_side_total_cnt = $old_side_total_cnt + 1;
                    $old_side_avg = (double)($old['side_avg'] ?? $price);
                    $new_side_avg = (($old_side_avg * $old_side_total_cnt) + $price) / $new_side_total_cnt;

                    $acc_bid_vol = (double)($old['acc_bid_volume'] ?? 0) + ($dir == 1 ? $vol : 0);
                    $acc_ask_vol = (double)($old['acc_ask_volume'] ?? 0) + ($dir == -1 ? $vol : 0);
                    $net_vol = $acc_bid_vol - $acc_ask_vol;

                    // 3. 업데이트 (여기는 stats_direction_01에 맞게 wr_datetime 기록)
                    $u_sql = "UPDATE stats_direction_01 SET
                                trade_price      = :price,
                                change_rate      = :rate_now,
                                side_plus        = :side_plus,
                                side_minus       = :side_minus,
                                side_total       = :side_total,
                                side_plus_cnt    = :plus_display,
                                side_minus_cnt   = :minus_display,
                                side_avg         = :side_avg,
                                side_total_cnt   = :side_total_cnt,
                                acc_bid_volume   = :bid_vol,
                                acc_ask_volume   = :ask_vol,
                                net_vol          = :net_vol,
                                chg_dir          = :chg_dir,
                                chg_dir_sum      = :chg_dir_sum,
                                chg_up_streak    = :chg_up,
                                chg_down_streak  = :chg_down,
                                chg_up_streak_max = :chg_up_max,
                                chg_down_streak_max = :chg_down_max,
                                chg_rate_avg     = :chg_avg,
                                chg_rate_avg_cnt = :chg_avg_cnt,
                                wr_datetime      = NOW()
                              WHERE market = :market";

                    $u_stmt = $db_upbit->prepare($u_sql);
                    $u_stmt->execute([
                        ':price'         => $price,
                        ':rate_now'      => $rate_now,
                        ':side_plus'     => $side_plus,
                        ':side_minus'    => $side_minus,
                        ':side_total'    => $side_total,
                        ':plus_display'  => ($dir == 1 ? 1 : 0),
                        ':minus_display' => ($dir == -1 ? -1 : 0),
                        ':side_avg'      => $new_side_avg,
                        ':side_total_cnt'=> $new_side_total_cnt,
                        ':bid_vol'       => $acc_bid_vol,
                        ':ask_vol'       => $acc_ask_vol,
                        ':net_vol'       => $net_vol,
                        ':chg_dir'       => $chg_dir,
                        ':chg_dir_sum'   => $chg_dir_sum,
                        ':chg_up'        => $chg_up,
                        ':chg_down'      => $chg_down,
                        ':chg_up_max'    => $chg_up_max,
                        ':chg_down_max'  => $chg_down_max,
                        ':chg_avg'       => $new_chg_avg,
                        ':chg_avg_cnt'   => $new_chg_avg_cnt,
                        ':market'        => $market
                    ]);

                    $db_upbit->commit();
                    
                    // 커밋 성공 시에만 ID 업데이트
                    $last_processed_id = $current_id;
                    $processed_count++;

                } catch (Exception $e) {
                    if ($db_upbit->inTransaction()) {
                        $db_upbit->rollBack();
                    }
                    log_message("처리 실패 (ID: {$current_id}, Market: {$market}): " . $e->getMessage(), true);
                    
                    // 실패한 레코드는 건너뛰고 다음으로 진행 (무한 루프 방지)
                    $last_processed_id = $current_id;
                }

                if (function_exists('pcntl_signal_dispatch')) {
                    pcntl_signal_dispatch();
                }
                
                if (!$keepRunning) break;
            }

            // 처리 결과 로그
            if ($processed_count > 0) {
                log_message("배치 처리 완료: {$processed_count}건, 마지막 ID: {$last_processed_id}");
                $processed_count = 0;
                
                // 마지막 ID 즉시 저장
                update_daemon_status($db_upbit, $DAEMON_ID, $PID, 'RUNNING', $last_processed_id);
            }

            if (count($rows) < 100) break;
            if (!$keepRunning) break;
        }

        usleep(500000); // 0.5초 대기

    } catch (PDOException $e) {
        log_message("DB 에러: " . $e->getMessage(), true);
        $db_upbit = null;
        sleep(5);
    } catch (Exception $e) {
        log_message("일반 에러: " . $e->getMessage(), true);
        sleep(1);
    }

    if (function_exists('pcntl_signal_dispatch')) {
        pcntl_signal_dispatch();
    }
}

// 종료 처리
if ($db_upbit !== null) {
    update_daemon_status($db_upbit, $DAEMON_ID, $PID, 'STOPPED', $last_processed_id);
}
log_message("데몬 종료 (ID: {$DAEMON_ID}, 마지막 처리 ID: {$last_processed_id})");