키움 증권 API를 이용하여 주식 자동 매매 프로그램 개발
https://github.com/kdseo/PyTrader/blob/master/Kiwoom.py
import sys import os from PyQt5.QtWidgets import * from PyQt5.QAxContainer import * from PyQt5.QtCore import * from beautifultable import BeautifulTable from PyQt5.QtTest import * TR_REQ_TIME_INTERVAL = 0.2 class Kiwoom(QAxWidget): def __init__(self): self.app = QApplication(sys.argv) print("Ui_class 입니다.") super().__init__() # 이벤트 루프 관련 변수 self.login_event_loop = QEventLoop() self.account_event_loop = QEventLoop() self.calculator_event_loop = QEventLoop() # 계좌 관련 변수 self.account_number = None self.total_buy_money = None self.total_evaluation_money = None self.total_evaluation_profit_and_loss_money = None self.total_yield = None self.account_stock_dict = {} self.not_signed_account_dict = {} # 예수금 관련 변수 self.deposit = None self.withdraw_deposit = None self.order_deposit = None # 종목 분석 관련 변수 self.calculator_list = [] # 화면 번호 self.screen_my_account = "1000" self.screen_calculation_stock = "2000" # 초기 작업 print("초기작업시작") self.create_kiwoom_instance() self.event_collection() # 이벤트와 슬롯을 메모리에 먼저 생성. self.login() print("계좌정보시작") input() self.get_account_info() # 계좌 번호만 얻어오기 self.get_deposit_info() # 예수금 관련된 정보 얻어오기 self.get_account_evaluation_balance() # 계좌평가잔고내역 얻어오기 self.not_signed_account() # 미체결내역 얻어오기 #self.calculator() self.menu() print("스케쥴링시작") ###########[스케쥴링]############ self.kiwoom = QAxWidget("KHOPENAPI.KHOpenAPICtrl.1") self.kiwoom.OnEventConnect.connect(self.kiwoom_OnEventConnect) # 로그인 결과를 받을 콜백함수 연결 self.get_ocx_instance() # OCX 방식을 파이썬에 사용할 수 있게 반환해 주는 함수 실행 self.event_slots() self.signal_login_commConnect() #접속 self.get_account_info() # 계좌정보 ###########[스케쥴링]############ # COM 오브젝트 생성. def create_kiwoom_instance(self): # 레지스트리에 저장된 키움 openAPI 모듈 불러오기 self.setControl("KHOPENAPI.KHOpenAPICtrl.1") def event_collection(self): self.OnEventConnect.connect(self.login_slot) # 로그인 관련 이벤트 self.OnReceiveTrData.connect(self.tr_slot) # 트랜잭션 요청 관련 이벤트 def login(self): self.dynamicCall("CommConnect()") # 시그널 함수 호출. self.login_event_loop.exec_() def login_slot(self, err_code): if err_code == 0: print("로그인에 성공하였습니다.") else: os.system('cls') print("에러 내용 :", errors(err_code)[1]) sys.exit(0) self.login_event_loop.exit() def get_account_info(self): account_list = self.dynamicCall("GetLoginInfo(QString)", "ACCNO") account_number = account_list.split(';')[0] self.account_number = account_number def menu(self): sel = "" while True: os.system('cls') print("1. 현재 로그인 상태 확인") print("2. 사용자 정보 조회") print("3. 예수금 조회") print("4. 계좌 잔고 조회") print("5. 미체결 내역 조회") print("Q. 프로그램 종료") sel = input("=> ") if sel == "Q" or sel == "q": sys.exit(0) if sel == "1": self.print_login_connect_state() elif sel == "2": self.print_my_info() elif sel == "3": self.print_get_deposit_info() elif sel == "4": self.print_get_account_evaulation_balance_info() elif sel == "5": self.print_not_signed_account() def print_login_connect_state(self): os.system('cls') isLogin = self.dynamicCall("GetConnectState()") if isLogin == 1: print("\n현재 계정은 로그인 상태입니다.") else: print("\n현재 계정은 로그아웃 상태입니다.") input() def print_my_info(self): os.system('cls') user_name = self.dynamicCall("GetLoginInfo(QString)", "USER_NAME") user_id = self.dynamicCall("GetLoginInfo(QString)", "USER_ID") account_count = self.dynamicCall( "GetLoginInfo(QString)", "ACCOUNT_CNT") print(f"\n이름 : {user_name}") print(f"ID : {user_id}") print(f"보유 계좌 수 : {account_count}") print(f"계좌번호 : {self.account_number}") input() def print_get_deposit_info(self): os.system('cls') print(f"\n예수금 : {self.deposit}원") print(f"출금 가능 금액 : {self.withdraw_deposit}원") print(f"주문 가능 금액 : {self.order_deposit}원") input() def print_get_account_evaulation_balance_info(self): os.system('cls') print("\n<싱글 데이터>") print(f"총 매입 금액 : {self.total_buy_money}원") print(f"총 평가 금액 : {self.total_evaluation_money}원") print(f"총 평가 손익 금액 : {self.total_evaluation_profit_and_loss_money}원") print(f"총 수익률 : {self.total_yield}%\n") table = self.make_table("계좌평가잔고내역요청") print("<멀티 데이터>") if len(self.account_stock_dict) == 0: print("보유한 종목이 없습니다!") else: print(f"보유 종목 수 : {len(self.account_stock_dict)}개") print(table) input() def make_table(self, sRQName): table = BeautifulTable() table = BeautifulTable(maxwidth=150) if sRQName == "계좌평가잔고내역요청": for stock_code in self.account_stock_dict: stock = self.account_stock_dict[stock_code] stockList = [] for key in stock: output = None if key == "종목명": output = stock[key] elif key == "수익률(%)": output = str(stock[key]) + "%" elif key == "보유수량" or key == "매매가능수량": output = str(stock[key]) + "개" else: output = str(stock[key]) + "원" stockList.append(output) table.rows.append(stockList) table.columns.header = ["종목명", "평가손익", "수익률", "매입가", "보유수량", "매매가능수량", "현재가"] table.rows.sort('종목명') elif sRQName == "실시간미체결요청": for stock_order_number in self.not_signed_account_dict: stock = self.not_signed_account_dict[stock_order_number] stockList = [stock_order_number] for key in stock: output = None if key == "주문가격" or key == "현재가": output = str(stock[key]) + "원" elif '량' in key: output = str(stock[key]) + "개" elif key == "종목코드": continue else: output = stock[key] stockList.append(output) table.rows.append(stockList) table.columns.header = ["주문번호", "종목명", "주문구분", "주문가격", "주문수량", "미체결수량", "체결량", "현재가", "주문상태"] table.rows.sort('주문번호') return table def print_not_signed_account(self): os.system('cls') print() table = self.make_table("실시간미체결요청") if len(self.not_signed_account_dict) == 0: print("미체결 내역이 없습니다!") else: print(table) input() def get_deposit_info(self, nPrevNext=0): self.dynamicCall("SetInputValue(QString, QString)", "계좌번호", self.account_number) self.dynamicCall("SetInputValue(QString, QString)", "비밀번호", " ") self.dynamicCall("SetInputValue(QString, QString)", "비밀번호입력매체구분", "00") self.dynamicCall("SetInputValue(QString, QString)", "조회구분", "2") self.dynamicCall("CommRqData(QString, QString, int, QString)", "예수금상세현황요청", "opw00001", nPrevNext, self.screen_my_account) self.account_event_loop.exec_() def get_account_evaluation_balance(self, nPrevNext=0): self.dynamicCall("SetInputValue(QString, QString)", "계좌번호", self.account_number) self.dynamicCall("SetInputValue(QString, QString)", "비밀번호", " ") self.dynamicCall("SetInputValue(QString, QString)", "비밀번호입력매체구분", "00") self.dynamicCall("SetInputValue(QString, QString)", "조회구분", "1") self.dynamicCall("CommRqData(QString, QString, int, QString)", "계좌평가잔고내역요청", "opw00018", nPrevNext, self.screen_my_account) if not self.account_event_loop.isRunning(): self.account_event_loop.exec_() def not_signed_account(self, nPrevNext=0): self.dynamicCall("SetInputValue(QString, QString)", "계좌번호", self.account_number) self.dynamicCall("SetInputValue(QString, QString)", "전체종목구분", "0") self.dynamicCall("SetInputValue(QString, QString)", "매매구분", "0") self.dynamicCall("SetInputValue(QString, QString)", "체결구분", "1") self.dynamicCall("CommRqData(QString, QString, int, QString)", "실시간미체결요청", "opt10075", nPrevNext, self.screen_my_account) if not self.account_event_loop.isRunning(): self.account_event_loop.exec_() def tr_slot(self, sScrNo, sRQName, sTrCode, sRecordName, sPrevNext): if sRQName == "예수금상세현황요청": deposit = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "예수금") self.deposit = int(deposit) withdraw_deposit = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "출금가능금액") self.withdraw_deposit = int(withdraw_deposit) order_deposit = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "주문가능금액") self.order_deposit = int(order_deposit) self.cancel_screen_number(self.screen_my_account) self.account_event_loop.exit() elif sRQName == "계좌평가잔고내역요청": if (self.total_buy_money == None or self.total_evaluation_money == None or self.total_evaluation_profit_and_loss_money == None or self.total_yield == None): total_buy_money = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "총매입금액") self.total_buy_money = int(total_buy_money) total_evaluation_money = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "총평가금액") self.total_evaluation_money = int(total_evaluation_money) total_evaluation_profit_and_loss_money = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "총평가손익금액") self.total_evaluation_profit_and_loss_money = int( total_evaluation_profit_and_loss_money) total_yield = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "총수익률(%)") self.total_yield = float(total_yield) cnt = self.dynamicCall( "GetRepeatCnt(QString, QString)", sTrCode, sRQName) for i in range(cnt): stock_code = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "종목번호") stock_code = stock_code.strip()[1:] stock_name = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "종목명") stock_name = stock_name.strip() # 필요 없는 공백 제거. stock_evaluation_profit_and_loss = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "평가손익") stock_evaluation_profit_and_loss = int( stock_evaluation_profit_and_loss) stock_yield = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "수익률(%)") stock_yield = float(stock_yield) stock_buy_money = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "매입가") stock_buy_money = int(stock_buy_money) stock_quantity = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "보유수량") stock_quantity = int(stock_quantity) stock_trade_quantity = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "매매가능수량") stock_trade_quantity = int(stock_trade_quantity) stock_present_price = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "현재가") stock_present_price = int(stock_present_price) if not stock_code in self.account_stock_dict: self.account_stock_dict[stock_code] = {} self.account_stock_dict[stock_code].update({'종목명': stock_name}) self.account_stock_dict[stock_code].update( {'평가손익': stock_evaluation_profit_and_loss}) self.account_stock_dict[stock_code].update( {'수익률(%)': stock_yield}) self.account_stock_dict[stock_code].update( {'매입가': stock_buy_money}) self.account_stock_dict[stock_code].update( {'보유수량': stock_quantity}) self.account_stock_dict[stock_code].update( {'매매가능수량': stock_trade_quantity}) self.account_stock_dict[stock_code].update( {'현재가': stock_present_price}) if sPrevNext == "2": self.get_account_evaluation_balance(2) else: self.cancel_screen_number(self.screen_my_account) self.account_event_loop.exit() elif sRQName == "실시간미체결요청": cnt = self.dynamicCall( "GetRepeatCnt(QString, QString)", sTrCode, sRQName) for i in range(cnt): stock_code = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "종목코드") stock_code = stock_code.strip() stock_order_number = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "주문번호") stock_order_number = int(stock_order_number) stock_name = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "종목명") stock_name = stock_name.strip() stock_order_type = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "주문구분") stock_order_type = stock_order_type.strip().lstrip('+').lstrip('-') stock_order_price = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "주문가격") stock_order_price = int(stock_order_price) stock_order_quantity = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "주문수량") stock_order_quantity = int(stock_order_quantity) stock_not_signed_quantity = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "미체결수량") stock_not_signed_quantity = int(stock_not_signed_quantity) stock_signed_quantity = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "체결량") stock_signed_quantity = int(stock_signed_quantity) stock_present_price = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "현재가") stock_present_price = int( stock_present_price.strip().lstrip('+').lstrip('-')) stock_order_status = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "주문상태") stock_order_status = stock_order_status.strip() if not stock_order_number in self.not_signed_account_dict: self.not_signed_account_dict[stock_order_number] = {} self.not_signed_account_dict[stock_order_number].update( {'종목코드': stock_code}) self.not_signed_account_dict[stock_order_number].update( {'종목명': stock_name}) self.not_signed_account_dict[stock_order_number].update( {'주문구분': stock_order_type}) self.not_signed_account_dict[stock_order_number].update( {'주문가격': stock_order_price}) self.not_signed_account_dict[stock_order_number].update( {'주문수량': stock_order_quantity}) self.not_signed_account_dict[stock_order_number].update( {'미체결수량': stock_not_signed_quantity}) self.not_signed_account_dict[stock_order_number].update( {'체결량': stock_signed_quantity}) self.not_signed_account_dict[stock_order_number].update( {'현재가': stock_present_price}) self.not_signed_account_dict[stock_order_number].update( {'주문상태': stock_order_status}) if sPrevNext == "2": self.not_signed_account(2) else: self.cancel_screen_number(sScrNo) self.account_event_loop.exit() elif sRQName == "주식일봉차트조회요청": stock_code = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, 0, "종목코드") #six_hundred_data = self.dynamicCall("GetCommDataEx(QString, QString)", sTrCode, sRQName) stock_code = stock_code.strip() cnt = self.dynamicCall( "GetRepeatCnt(QString, QString)", sTrCode, sRQName) # 최대 600일 for i in range(cnt): calculator_list = [] current_price = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "현재가") volume = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "거래량") trade_price = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "거래대금") date = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "일자") start_price = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "시가") high_price = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "고가") low_price = self.dynamicCall( "GetCommData(QString, QString, int, QString)", sTrCode, sRQName, i, "저가") calculator_list.append("") calculator_list.append(int(current_price)) calculator_list.append(int(volume)) calculator_list.append(int(trade_price)) calculator_list.append(int(date)) calculator_list.append(int(start_price)) calculator_list.append(int(high_price)) calculator_list.append(int(low_price)) calculator_list.append("") self.calculator_list.append(calculator_list.copy()) if sPrevNext == "2": self.day_kiwoom_db(stock_code, None, 2) else: self.calculator_event_loop.exit() def cancel_screen_number(self, sScrNo): self.dynamicCall("DisconnectRealData(QString)", sScrNo) def get_code_list_by_market(self, market_code): code_list = self.dynamicCall( "GetCodeListByMarket(QString)", market_code) code_list = code_list.split(";")[:-1] return code_list def calculator(self): kosdaq_list = self.get_code_list_by_market("10") for idx, stock_code in enumerate(kosdaq_list): self.dynamicCall("DisconnectRealData(QString)", self.screen_calculation_stock) print( f"{idx + 1} / {len(kosdaq_list)} : KOSDAQ Stock Code : {stock_code} is updating...") self.day_kiwoom_db(stock_code) def day_kiwoom_db(self, stock_code=None, date=None, nPrevNext=0): QTest.qWait(3600) # 3.6초마다 딜레이 self.dynamicCall("SetInputValue(QString, QString)", "종목코드", stock_code) self.dynamicCall("SetInputValue(QString, QString)", "수정주가구분", 1) if date != None: # date가 None일 경우 date는 오늘 날짜 기준 self.dynamicCall("SetInputValue(QString, QString)", "기준일자", date) self.dynamicCall("CommRqData(QString, QString, int, QString)", "주식일봉차트조회요청", "opt10081", nPrevNext, self.screen_calculation_stock) if not self.calculator_event_loop.isRunning(): self.calculator_event_loop.exec_() def errors(err_code): err_dic = {0: ('OP_ERR_NONE', '정상처리'), -10: ('OP_ERR_FAIL', '실패'), -100: ('OP_ERR_LOGIN', '사용자정보교환실패'), -101: ('OP_ERR_CONNECT', '서버접속실패'), -102: ('OP_ERR_VERSION', '버전처리실패'), -103: ('OP_ERR_FIREWALL', '개인방화벽실패'), -104: ('OP_ERR_MEMORY', '메모리보호실패'), -105: ('OP_ERR_INPUT', '함수입력값오류'), -106: ('OP_ERR_SOCKET_CLOSED', '통신연결종료'), -200: ('OP_ERR_SISE_OVERFLOW', '시세조회과부하'), -201: ('OP_ERR_RQ_STRUCT_FAIL', '전문작성초기화실패'), -202: ('OP_ERR_RQ_STRING_FAIL', '전문작성입력값오류'), -203: ('OP_ERR_NO_DATA', '데이터없음'), -204: ('OP_ERR_OVER_MAX_DATA', '조회가능한종목수초과'), -205: ('OP_ERR_DATA_RCV_FAIL', '데이터수신실패'), -206: ('OP_ERR_OVER_MAX_FID', '조회가능한FID수초과'), -207: ('OP_ERR_REAL_CANCEL', '실시간해제오류'), -300: ('OP_ERR_ORD_WRONG_INPUT', '입력값오류'), -301: ('OP_ERR_ORD_WRONG_ACCTNO', '계좌비밀번호없음'), -302: ('OP_ERR_OTHER_ACC_USE', '타인계좌사용오류'), -303: ('OP_ERR_MIS_2BILL_EXC', '주문가격이20억원을초과'), -304: ('OP_ERR_MIS_5BILL_EXC', '주문가격이50억원을초과'), -305: ('OP_ERR_MIS_1PER_EXC', '주문수량이총발행주수의1 % 초과오류'), -306: ('OP_ERR_MIS_3PER_EXC', '주문수량은총발행주수의3 % 초과오류'), -307: ('OP_ERR_SEND_FAIL', '주문전송실패'), -308: ('OP_ERR_ORD_OVERFLOW', '주문전송과부하'), -309: ('OP_ERR_MIS_300CNT_EXC', '주문수량300계약초과'), -310: ('OP_ERR_MIS_500CNT_EXC', '주문수량500계약초과'), -340: ('OP_ERR_ORD_WRONG_ACCTINFO', '계좌정보없음'), -500: ('OP_ERR_ORD_SYMCODE_EMPTY', '종목코드없음') } result = err_dic[err_code] return result ######################## 시작 if __name__=="__main__": kiwoom = Kiwoom()
위의 소스를 가지고 하나씩 뜯어봐야겠다.
input() 함수로 사용자가 어떤 값을 입력하게 하고, 그 값을 변수에 저장할 수 있습니다.
중간중간에 input() 함수가 있어서 이게 뭔가 싶어 조회를 해보니,
그냥 입력값 전달받는 함수였다.
입력하고 엔터를 누르면 되겠다.