파이썬

키움 증권 API를 이용하여 주식 자동 매매 프로그램 개발

by nanumi posted Sep 07, 2021
?

단축키

Prev이전 문서

Next다음 문서

ESC닫기

크게 작게 위로 아래로 댓글로 가기 인쇄

키움 증권 API를 이용하여 주식 자동 매매 프로그램 개발

 

 

https://github.com/kdseo/PyTrader/blob/master/Kiwoom.py

 

import sys
import os
from PyQt5.QtWidgets import *
from PyQt5.QAxContainer import *
from PyQt5.QtCore import *
from beautifultable import BeautifulTable
from PyQt5.QtTest import *

TR_REQ_TIME_INTERVAL = 0.2
class Kiwoom(QAxWidget):
    def __init__(self):
        self.app = QApplication(sys.argv)
        print("Ui_class 입니다.")
        super().__init__()
        # 이벤트 루프 관련 변수
        self.login_event_loop = QEventLoop()
        self.account_event_loop = QEventLoop()
        self.calculator_event_loop = QEventLoop()

        # 계좌 관련 변수
        self.account_number = None
        self.total_buy_money = None
        self.total_evaluation_money = None
        self.total_evaluation_profit_and_loss_money = None
        self.total_yield = None
        self.account_stock_dict = {}
        self.not_signed_account_dict = {}

        # 예수금 관련 변수
        self.deposit = None
        self.withdraw_deposit = None
        self.order_deposit = None

        # 종목 분석 관련 변수
        self.calculator_list = []

        # 화면 번호
        self.screen_my_account = "1000"
        self.screen_calculation_stock = "2000"

        # 초기 작업
        print("초기작업시작")
        self.create_kiwoom_instance()
        self.event_collection()  # 이벤트와 슬롯을 메모리에 먼저 생성.
        self.login()
        print("계좌정보시작")
        input()
        self.get_account_info()  # 계좌 번호만 얻어오기
        self.get_deposit_info()  # 예수금 관련된 정보 얻어오기
        self.get_account_evaluation_balance()  # 계좌평가잔고내역 얻어오기
        self.not_signed_account()  # 미체결내역 얻어오기
        #self.calculator()
        self.menu()

        print("스케쥴링시작")
###########[스케쥴링]############
        self.kiwoom = QAxWidget("KHOPENAPI.KHOpenAPICtrl.1")
        self.kiwoom.OnEventConnect.connect(self.kiwoom_OnEventConnect)  # 로그인 결과를 받을 콜백함수 연결
        self.get_ocx_instance() # OCX 방식을 파이썬에 사용할 수 있게 반환해 주는 함수 실행
        self.event_slots()
        self.signal_login_commConnect() #접속
        self.get_account_info()  # 계좌정보



###########[스케쥴링]############


    # COM 오브젝트 생성.
    def create_kiwoom_instance(self):
        # 레지스트리에 저장된 키움 openAPI 모듈 불러오기
        self.setControl("KHOPENAPI.KHOpenAPICtrl.1")

    def event_collection(self):
        self.OnEventConnect.connect(self.login_slot)  # 로그인 관련 이벤트
        self.OnReceiveTrData.connect(self.tr_slot)  # 트랜잭션 요청 관련 이벤트

    def login(self):
        self.dynamicCall("CommConnect()")  # 시그널 함수 호출.
        self.login_event_loop.exec_()

    def login_slot(self, err_code):
        if err_code == 0:
            print("로그인에 성공하였습니다.")
        else:
            os.system('cls')
            print("에러 내용 :", errors(err_code)[1])
            sys.exit(0)
        self.login_event_loop.exit()

    def get_account_info(self):
        account_list = self.dynamicCall("GetLoginInfo(QString)", "ACCNO")
        account_number = account_list.split(';')[0]
        self.account_number = account_number

    def menu(self):
        sel = ""
        while True:
            os.system('cls')
            print("1. 현재 로그인 상태 확인")
            print("2. 사용자 정보 조회")
            print("3. 예수금 조회")
            print("4. 계좌 잔고 조회")
            print("5. 미체결 내역 조회")
            print("Q. 프로그램 종료")
            sel = input("=> ")

            if sel == "Q" or sel == "q":
                sys.exit(0)

            if sel == "1":
                self.print_login_connect_state()
            elif sel == "2":
                self.print_my_info()
            elif sel == "3":
                self.print_get_deposit_info()
            elif sel == "4":
                self.print_get_account_evaulation_balance_info()
            elif sel == "5":
                self.print_not_signed_account()

    def print_login_connect_state(self):
        os.system('cls')
        isLogin = self.dynamicCall("GetConnectState()")
        if isLogin == 1:
            print("\n현재 계정은 로그인 상태입니다.")
        else:
            print("\n현재 계정은 로그아웃 상태입니다.")
        input()

    def print_my_info(self):
        os.system('cls')
        user_name = self.dynamicCall("GetLoginInfo(QString)", "USER_NAME")
        user_id = self.dynamicCall("GetLoginInfo(QString)", "USER_ID")
        account_count = self.dynamicCall(
            "GetLoginInfo(QString)", "ACCOUNT_CNT")

        print(f"\n이름 : {user_name}")
        print(f"ID : {user_id}")
        print(f"보유 계좌 수 : {account_count}")
        print(f"계좌번호 : {self.account_number}")
        input()

    def print_get_deposit_info(self):
        os.system('cls')
        print(f"\n예수금 : {self.deposit}")
        print(f"출금 가능 금액 : {self.withdraw_deposit}")
        print(f"주문 가능 금액 : {self.order_deposit}")
        input()

    def print_get_account_evaulation_balance_info(self):
        os.system('cls')
        print("\n<싱글 데이터>")
        print(f"총 매입 금액 : {self.total_buy_money}")
        print(f"총 평가 금액 : {self.total_evaluation_money}")
        print(f"총 평가 손익 금액 : {self.total_evaluation_profit_and_loss_money}")
        print(f"총 수익률 : {self.total_yield}%\n")

        table = self.make_table("계좌평가잔고내역요청")
        print("<멀티 데이터>")
        if len(self.account_stock_dict) == 0:
            print("보유한 종목이 없습니다!")
        else:
            print(f"보유 종목 수 : {len(self.account_stock_dict)}")
            print(table)
        input()

    def make_table(self, sRQName):
        table = BeautifulTable()
        table = BeautifulTable(maxwidth=150)

        if sRQName == "계좌평가잔고내역요청":
            for stock_code in self.account_stock_dict:
                stock = self.account_stock_dict[stock_code]
                stockList = []
                for key in stock:
                    output = None

                    if key == "종목명":
                        output = stock[key]
                    elif key == "수익률(%)":
                        output = str(stock[key]) + "%"
                    elif key == "보유수량" or key == "매매가능수량":
                        output = str(stock[key]) + ""
                    else:
                        output = str(stock[key]) + ""
                    stockList.append(output)
                table.rows.append(stockList)
            table.columns.header = ["종목명", "평가손익",
                                    "수익률", "매입가", "보유수량", "매매가능수량", "현재가"]
            table.rows.sort('종목명')

        elif sRQName == "실시간미체결요청":
            for stock_order_number in self.not_signed_account_dict:
                stock = self.not_signed_account_dict[stock_order_number]
                stockList = [stock_order_number]
                for key in stock:
                    output = None
                    if key == "주문가격" or key == "현재가":
                        output = str(stock[key]) + ""
                    elif '' in key:
                        output = str(stock[key]) + ""
                    elif key == "종목코드":
                        continue
                    else:
                        output = stock[key]
                    stockList.append(output)
                table.rows.append(stockList)
            table.columns.header = ["주문번호", "종목명", "주문구분", "주문가격", "주문수량",
                                    "미체결수량", "체결량", "현재가", "주문상태"]
            table.rows.sort('주문번호')
        return table

    def print_not_signed_account(self):
        os.system('cls')
        print()
        table = self.make_table("실시간미체결요청")
        if len(self.not_signed_account_dict) == 0:
            print("미체결 내역이 없습니다!")
        else:
            print(table)
        input()

    def get_deposit_info(self, nPrevNext=0):
        self.dynamicCall("SetInputValue(QString, QString)",
                         "계좌번호", self.account_number)
        self.dynamicCall("SetInputValue(QString, QString)", "비밀번호", " ")
        self.dynamicCall("SetInputValue(QString, QString)", "비밀번호입력매체구분", "00")
        self.dynamicCall("SetInputValue(QString, QString)", "조회구분", "2")
        self.dynamicCall("CommRqData(QString, QString, int, QString)",
                         "예수금상세현황요청", "opw00001", nPrevNext, self.screen_my_account)

        self.account_event_loop.exec_()

    def get_account_evaluation_balance(self, nPrevNext=0):
        self.dynamicCall("SetInputValue(QString, QString)",
                         "계좌번호", self.account_number)
        self.dynamicCall("SetInputValue(QString, QString)", "비밀번호", " ")
        self.dynamicCall("SetInputValue(QString, QString)", "비밀번호입력매체구분", "00")
        self.dynamicCall("SetInputValue(QString, QString)", "조회구분", "1")
        self.dynamicCall("CommRqData(QString, QString, int, QString)",
                         "계좌평가잔고내역요청", "opw00018", nPrevNext, self.screen_my_account)

        if not self.account_event_loop.isRunning():
            self.account_event_loop.exec_()

    def not_signed_account(self, nPrevNext=0):
        self.dynamicCall("SetInputValue(QString, QString)",
                         "계좌번호", self.account_number)
        self.dynamicCall("SetInputValue(QString, QString)", "전체종목구분", "0")
        self.dynamicCall("SetInputValue(QString, QString)", "매매구분", "0")
        self.dynamicCall("SetInputValue(QString, QString)", "체결구분", "1")
        self.dynamicCall("CommRqData(QString, QString, int, QString)",
                         "실시간미체결요청", "opt10075", nPrevNext, self.screen_my_account)

        if not self.account_event_loop.isRunning():
            self.account_event_loop.exec_()

    def tr_slot(self, sScrNo, sRQName, sTrCode, sRecordName, sPrevNext):
        if sRQName == "예수금상세현황요청":
            deposit = self.dynamicCall(
                "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "예수금")
            self.deposit = int(deposit)

            withdraw_deposit = self.dynamicCall(
                "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "출금가능금액")
            self.withdraw_deposit = int(withdraw_deposit)

            order_deposit = self.dynamicCall(
                "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "주문가능금액")
            self.order_deposit = int(order_deposit)
            self.cancel_screen_number(self.screen_my_account)
            self.account_event_loop.exit()

        elif sRQName == "계좌평가잔고내역요청":
            if (self.total_buy_money == None or self.total_evaluation_money == None
                    or self.total_evaluation_profit_and_loss_money == None or self.total_yield == None):
                total_buy_money = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "총매입금액")
                self.total_buy_money = int(total_buy_money)

                total_evaluation_money = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "총평가금액")
                self.total_evaluation_money = int(total_evaluation_money)

                total_evaluation_profit_and_loss_money = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "총평가손익금액")
                self.total_evaluation_profit_and_loss_money = int(
                    total_evaluation_profit_and_loss_money)

                total_yield = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "총수익률(%)")
                self.total_yield = float(total_yield)

            cnt = self.dynamicCall(
                "GetRepeatCnt(QString, QString)", sTrCode, sRQName)

            for i in range(cnt):
                stock_code = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "종목번호")
                stock_code = stock_code.strip()[1:]

                stock_name = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "종목명")
                stock_name = stock_name.strip()  # 필요 없는 공백 제거.

                stock_evaluation_profit_and_loss = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "평가손익")
                stock_evaluation_profit_and_loss = int(
                    stock_evaluation_profit_and_loss)

                stock_yield = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "수익률(%)")
                stock_yield = float(stock_yield)

                stock_buy_money = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "매입가")
                stock_buy_money = int(stock_buy_money)

                stock_quantity = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "보유수량")
                stock_quantity = int(stock_quantity)

                stock_trade_quantity = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "매매가능수량")
                stock_trade_quantity = int(stock_trade_quantity)

                stock_present_price = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "현재가")
                stock_present_price = int(stock_present_price)

                if not stock_code in self.account_stock_dict:
                    self.account_stock_dict[stock_code] = {}

                self.account_stock_dict[stock_code].update({'종목명': stock_name})
                self.account_stock_dict[stock_code].update(
                    {'평가손익': stock_evaluation_profit_and_loss})
                self.account_stock_dict[stock_code].update(
                    {'수익률(%)': stock_yield})
                self.account_stock_dict[stock_code].update(
                    {'매입가': stock_buy_money})
                self.account_stock_dict[stock_code].update(
                    {'보유수량': stock_quantity})
                self.account_stock_dict[stock_code].update(
                    {'매매가능수량': stock_trade_quantity})
                self.account_stock_dict[stock_code].update(
                    {'현재가': stock_present_price})

            if sPrevNext == "2":
                self.get_account_evaluation_balance(2)
            else:
                self.cancel_screen_number(self.screen_my_account)
                self.account_event_loop.exit()

        elif sRQName == "실시간미체결요청":
            cnt = self.dynamicCall(
                "GetRepeatCnt(QString, QString)", sTrCode, sRQName)

            for i in range(cnt):
                stock_code = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "종목코드")
                stock_code = stock_code.strip()

                stock_order_number = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "주문번호")
                stock_order_number = int(stock_order_number)

                stock_name = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "종목명")
                stock_name = stock_name.strip()

                stock_order_type = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "주문구분")
                stock_order_type = stock_order_type.strip().lstrip('+').lstrip('-')

                stock_order_price = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "주문가격")
                stock_order_price = int(stock_order_price)

                stock_order_quantity = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "주문수량")
                stock_order_quantity = int(stock_order_quantity)

                stock_not_signed_quantity = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "미체결수량")
                stock_not_signed_quantity = int(stock_not_signed_quantity)

                stock_signed_quantity = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "체결량")
                stock_signed_quantity = int(stock_signed_quantity)

                stock_present_price = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "현재가")
                stock_present_price = int(
                    stock_present_price.strip().lstrip('+').lstrip('-'))

                stock_order_status = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "주문상태")
                stock_order_status = stock_order_status.strip()

                if not stock_order_number in self.not_signed_account_dict:
                    self.not_signed_account_dict[stock_order_number] = {}

                self.not_signed_account_dict[stock_order_number].update(
                    {'종목코드': stock_code})
                self.not_signed_account_dict[stock_order_number].update(
                    {'종목명': stock_name})
                self.not_signed_account_dict[stock_order_number].update(
                    {'주문구분': stock_order_type})
                self.not_signed_account_dict[stock_order_number].update(
                    {'주문가격': stock_order_price})
                self.not_signed_account_dict[stock_order_number].update(
                    {'주문수량': stock_order_quantity})
                self.not_signed_account_dict[stock_order_number].update(
                    {'미체결수량': stock_not_signed_quantity})
                self.not_signed_account_dict[stock_order_number].update(
                    {'체결량': stock_signed_quantity})
                self.not_signed_account_dict[stock_order_number].update(
                    {'현재가': stock_present_price})
                self.not_signed_account_dict[stock_order_number].update(
                    {'주문상태': stock_order_status})

            if sPrevNext == "2":
                self.not_signed_account(2)
            else:
                self.cancel_screen_number(sScrNo)
                self.account_event_loop.exit()

        elif sRQName == "주식일봉차트조회요청":
            stock_code = self.dynamicCall(
                "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "종목코드")
            #six_hundred_data = self.dynamicCall("GetCommDataEx(QString, QString)", sTrCode, sRQName)

            stock_code = stock_code.strip()
            cnt = self.dynamicCall(
                "GetRepeatCnt(QString, QString)", sTrCode, sRQName)  # 최대 600
            for i in range(cnt):
                calculator_list = []

                current_price = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "현재가")
                volume = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "거래량")
                trade_price = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "거래대금")
                date = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "일자")
                start_price = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "시가")
                high_price = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "고가")
                low_price = self.dynamicCall(
                    "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "저가")

                calculator_list.append("")
                calculator_list.append(int(current_price))
                calculator_list.append(int(volume))
                calculator_list.append(int(trade_price))
                calculator_list.append(int(date))
                calculator_list.append(int(start_price))
                calculator_list.append(int(high_price))
                calculator_list.append(int(low_price))
                calculator_list.append("")

                self.calculator_list.append(calculator_list.copy())

            if sPrevNext == "2":
                self.day_kiwoom_db(stock_code, None, 2)
            else:
                self.calculator_event_loop.exit()

    def cancel_screen_number(self, sScrNo):
        self.dynamicCall("DisconnectRealData(QString)", sScrNo)

    def get_code_list_by_market(self, market_code):
        code_list = self.dynamicCall(
            "GetCodeListByMarket(QString)", market_code)
        code_list = code_list.split(";")[:-1]
        return code_list

    def calculator(self):
        kosdaq_list = self.get_code_list_by_market("10")

        for idx, stock_code in enumerate(kosdaq_list):
            self.dynamicCall("DisconnectRealData(QString)",
                             self.screen_calculation_stock)

            print(
                f"{idx + 1} / {len(kosdaq_list)} : KOSDAQ Stock Code : {stock_code} is updating...")
            self.day_kiwoom_db(stock_code)

    def day_kiwoom_db(self, stock_code=None, date=None, nPrevNext=0):
        QTest.qWait(3600)  # 3.6초마다 딜레이

        self.dynamicCall("SetInputValue(QString, QString)", "종목코드", stock_code)
        self.dynamicCall("SetInputValue(QString, QString)", "수정주가구분", 1)

        if date != None:  # date None일 경우 date는 오늘 날짜 기준
            self.dynamicCall("SetInputValue(QString, QString)", "기준일자", date)

        self.dynamicCall("CommRqData(QString, QString, int, QString)",
                         "주식일봉차트조회요청", "opt10081", nPrevNext, self.screen_calculation_stock)

        if not self.calculator_event_loop.isRunning():
            self.calculator_event_loop.exec_()

    def errors(err_code):

        err_dic = {0: ('OP_ERR_NONE', '정상처리'),
                   -10: ('OP_ERR_FAIL', '실패'),
                   -100: ('OP_ERR_LOGIN', '사용자정보교환실패'),
                   -101: ('OP_ERR_CONNECT', '서버접속실패'),
                   -102: ('OP_ERR_VERSION', '버전처리실패'),
                   -103: ('OP_ERR_FIREWALL', '개인방화벽실패'),
                   -104: ('OP_ERR_MEMORY', '메모리보호실패'),
                   -105: ('OP_ERR_INPUT', '함수입력값오류'),
                   -106: ('OP_ERR_SOCKET_CLOSED', '통신연결종료'),
                   -200: ('OP_ERR_SISE_OVERFLOW', '시세조회과부하'),
                   -201: ('OP_ERR_RQ_STRUCT_FAIL', '전문작성초기화실패'),
                   -202: ('OP_ERR_RQ_STRING_FAIL', '전문작성입력값오류'),
                   -203: ('OP_ERR_NO_DATA', '데이터없음'),
                   -204: ('OP_ERR_OVER_MAX_DATA', '조회가능한종목수초과'),
                   -205: ('OP_ERR_DATA_RCV_FAIL', '데이터수신실패'),
                   -206: ('OP_ERR_OVER_MAX_FID', '조회가능한FID수초과'),
                   -207: ('OP_ERR_REAL_CANCEL', '실시간해제오류'),
                   -300: ('OP_ERR_ORD_WRONG_INPUT', '입력값오류'),
                   -301: ('OP_ERR_ORD_WRONG_ACCTNO', '계좌비밀번호없음'),
                   -302: ('OP_ERR_OTHER_ACC_USE', '타인계좌사용오류'),
                   -303: ('OP_ERR_MIS_2BILL_EXC', '주문가격이20억원을초과'),
                   -304: ('OP_ERR_MIS_5BILL_EXC', '주문가격이50억원을초과'),
                   -305: ('OP_ERR_MIS_1PER_EXC', '주문수량이총발행주수의1 % 초과오류'),
                   -306: ('OP_ERR_MIS_3PER_EXC', '주문수량은총발행주수의3 % 초과오류'),
                   -307: ('OP_ERR_SEND_FAIL', '주문전송실패'),
                   -308: ('OP_ERR_ORD_OVERFLOW', '주문전송과부하'),
                   -309: ('OP_ERR_MIS_300CNT_EXC', '주문수량300계약초과'),
                   -310: ('OP_ERR_MIS_500CNT_EXC', '주문수량500계약초과'),
                   -340: ('OP_ERR_ORD_WRONG_ACCTINFO', '계좌정보없음'),
                   -500: ('OP_ERR_ORD_SYMCODE_EMPTY', '종목코드없음')
                   }

        result = err_dic[err_code]

        return result


######################## 시작
if __name__=="__main__":

    kiwoom = Kiwoom()

 

 

위의 소스를 가지고 하나씩 뜯어봐야겠다.

 

input() 함수로 사용자가 어떤 값을 입력하게 하고, 그 값을 변수에 저장할 수 있습니다.

 

중간중간에 input() 함수가 있어서 이게 뭔가 싶어 조회를 해보니, 

그냥 입력값 전달받는 함수였다.

입력하고 엔터를 누르면 되겠다.