## $Id: helpers.py 17233 2022-12-21 10:21:27Z henrik $
##
## Copyright (C) 2012 Uli Fouquet & Henrik Bettermann
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
"""General helper functions for the interswitch module in custom packages.
"""
from datetime import datetime
from ssl import SSLError
import httplib
import hashlib
import json
from urllib import urlencode
import grok
from xml.dom.minidom import parseString
from zope.event import notify
from waeup.kofa.payments.interfaces import IPayer
from kofacustom.nigeria.interfaces import MessageFactory as _
def SOAP_post(soap_action, xml, host, url, https):
"""Handles making the SOAP request.
"""
if https:
h = httplib.HTTPSConnection(host)
else:
h = httplib.HTTPConnection(host)
headers={
'Host':host,
'Content-Type':'text/xml; charset=utf-8',
'Content-Length':len(xml),
'SOAPAction':'"%s"' % soap_action,
}
h.request('POST', url, body=xml,headers=headers)
response = h.getresponse()
return response
def write_payments_log(id, payment):
payment.logger.info(
'%s,%s,%s,%s,%s,%s,%s,%s,,,' % (
id, payment.p_id, payment.p_category,
payment.amount_auth, payment.r_code,
payment.provider_amt, payment.gateway_amt,
payment.thirdparty_amt))
# CollegePAY helper functions
def get_SOAP_response(product_id, transref, host, url, https):
xml="""\
%s
%s
""" % (product_id, transref)
response=SOAP_post("http://tempuri.org/getTransactionData",xml, host, url, https)
if response.status!=200:
return 'Connection error (%s, %s)' % (response.status, response.reason)
result_xml = response.read()
doc=parseString(result_xml)
response=doc.getElementsByTagName('getTransactionDataResult')[0].firstChild.data
return response
def get_JSON_response(product_id, transref, host, url, https, mac, amount):
hashargs = product_id + transref + mac
hashvalue = hashlib.sha512(hashargs).hexdigest()
headers={
'Content-Type':'text/xml; charset=utf-8',
'Hash':hashvalue,
}
if https:
h = httplib.HTTPSConnection(host)
else:
h = httplib.HTTPConnection(host)
amount = int(100 * amount)
args = {'productid': product_id,
'transactionreference': transref,
'amount': amount}
url = '%s?' % url + urlencode(args)
try:
h.request("GET", url, headers=headers)
except SSLError:
return {'error': 'SSL handshake error'}
response = h.getresponse()
if response.status!=200:
return {'error': 'Connection error (%s, %s)' % (response.status, response.reason)}
jsonout = response.read()
parsed_json = json.loads(jsonout)
return parsed_json
def query_interswitch_SOAP(payment, product_id, host, url, https, verify):
sr = get_SOAP_response(product_id, payment.p_id, host, url, https)
if sr.startswith('Connection error'):
msg = _('Connection error')
log = sr
return False, msg, log
wlist = sr.split(':')
if len(wlist) < 7:
msg = _('Invalid callback: ${a}', mapping = {'a': sr})
log = 'invalid callback for payment %s: %s' % (payment.p_id, sr)
return False, msg, log
payment.r_code = wlist[0]
payment.r_desc = wlist[1]
payment.r_amount_approved = float(wlist[2]) / 100
payment.r_card_num = wlist[3]
payment.r_pay_reference = wlist[5]
payment.r_company = u'interswitch'
if payment.r_code != '00':
msg = _('Unsuccessful callback: ${a}', mapping = {'a': sr})
log = 'unsuccessful callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, sr)
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
if round(payment.r_amount_approved, 0) != round(payment.amount_auth, 0):
msg = _('Callback amount does not match.')
log = 'wrong callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, sr)
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
if wlist[4] != payment.p_id:
msg = _('Callback transaction id does not match.')
log = 'wrong callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, sr)
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
payment.p_state = 'paid'
if not verify:
payment.payment_date = datetime.utcnow()
msg = _('Successful callback received.')
log = 'valid callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, sr)
notify(grok.ObjectModifiedEvent(payment))
return True, msg, log
def query_interswitch(payment, product_id, host, url, https, mac, verify):
# If no mac mac key is given, fall back to deprecated SOAP method
# (Uniben, AAUA, FCEOkene).
if mac == None:
return query_interswitch_SOAP(
payment, product_id, host, url, https, verify)
jr = get_JSON_response(product_id, payment.p_id, host, url,
https, mac, payment.amount_auth)
error = jr.get('error')
if error:
msg = log = error
return False, msg, log
# A typical JSON response
# old:
# {u'SplitAccounts': [],
# u'MerchantReference':u'p4210665523377',
# u'PaymentReference':u'GTB|WEB|KPOLY|12-01-2015|013138',
# u'TransactionDate':u'2015-01-12T13:43:39.27',
# u'RetrievalReferenceNumber':u'000170548791',
# u'ResponseDescription': u'Approved Successful',
# u'Amount': 2940000,
# u'CardNumber': u'2507',
# u'ResponseCode': u'00',
# u'LeadBankCbnCode': None,
# u'LeadBankName': None}
# new:
# 'PaymentReference' is maybe missing
# {u'SplitAccounts': [],
# u'MerchantReference':u'p5918633006916',
# u'TransactionDate':u'2020-06-11T09:17:37',
# u'ResponseDescription':u'Customer Cancellation',
# u'Amount': 89525000,
# u'CardNumber': u'',
# u'ResponseCode': u'Z6',
# u'BankCode': u''}
if not 'ResponseCode' in jr.keys() \
or not 'ResponseDescription' in jr.keys() \
or not 'Amount' in jr.keys():
msg = _('Invalid callback: ${a}', mapping = {'a': str(jr)})
log = 'invalid callback for payment %s: %s' % (payment.p_id, str(jr))
return False, msg, log
if verify and jr['ResponseCode'] == '20050':
msg = _('Integration method has changed.')
log = 'invalid callback for payment %s: %s' % (payment.p_id, str(jr))
return False, msg, log
payment.r_code = jr['ResponseCode']
payment.r_desc = jr['ResponseDescription']
payment.r_amount_approved = jr['Amount'] / 100.0
payment.r_card_num = jr.get('CardNumber', u'')
payment.r_pay_reference = jr.get('PaymentReference', u'')
#payment.r_company = u'interswitch'
if payment.r_code != '00':
msg = _('Unsuccessful callback: ${a}', mapping = {'a': payment.r_desc})
log = 'unsuccessful callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, payment.r_desc)
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
if round(payment.r_amount_approved, 0) != round(payment.amount_auth, 0):
msg = _('Callback amount does not match.')
log = 'wrong callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, str(jr))
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
if jr['MerchantReference'] != payment.p_id:
msg = _('Callback transaction id does not match.')
log = 'wrong callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, str(jr))
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
payment.p_state = 'paid'
if not verify:
payment.payment_date = datetime.utcnow()
msg = _('Successful callback received')
log = 'valid callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, str(jr))
notify(grok.ObjectModifiedEvent(payment))
return True, msg, log
# PAYDirect helper functions
def create_paydirect_booking(merchant_id, payment, item_code, host, url, https):
p_id = payment.p_id
description = payment.p_category
amount = int(100*payment.amount_auth) # Amount in Kobo
date_booked = payment.creation_date.strftime("%Y-%m-%d")
date_expired = "2099-12-31"
firstname = IPayer(payment).display_fullname.split()[0]
lastname = IPayer(payment).display_fullname.split()[-1]
id = IPayer(payment).id
email = IPayer(payment).email
xml="""\
%s
%s%s
%s
%s
%s
%s
%s
%s
%s
%s
""" % (
merchant_id, merchant_id, p_id[1:],
description, amount,
date_booked, date_expired,
firstname, lastname,
email, item_code)
response=SOAP_post(
"http://interswitchng.com/bookonhold/CreateBooking",
xml, host, url, https)
if response.status!=200:
error = 'Connection error (%s, %s)' % (response.status, response.reason)
return error
result_xml = response.read()
return result_xml
def get_SOAP_response_paydirect(merchant_id, p_id, host, url, https):
xml="""\
%s
%s%s
""" % (merchant_id, merchant_id, p_id[1:])
response=SOAP_post(
"http://interswitchng.com/bookonhold/FetchBookingDetails",
xml, host, url, https)
if response.status!=200:
error = 'Connection error (%s, %s)' % (response.status, response.reason)
return error
result_xml = response.read()
return result_xml
def fetch_booking_details(payment, merchant_id, host, url, https):
result_xml = get_SOAP_response_paydirect(
merchant_id, payment.p_id, host, url, https)
if result_xml.startswith('Connection error'):
return False, result_xml, result_xml
doc=parseString(result_xml)
if not doc.getElementsByTagName('PaymentStatus'):
msg = _('Your payment %s was not found.' % payment.p_id)
log = 'payment %s cannot be found' % payment.p_id
return False, msg, log
p_status = doc.getElementsByTagName('PaymentStatus')[0].firstChild.data
payment.r_code = p_status
try:
payment.r_desc = "Channel Name: %s - Terminal Id: %s - Location: %s" % (
doc.getElementsByTagName('ChannelName')[0].firstChild.data,
doc.getElementsByTagName('TerminalId')[0].firstChild.data,
doc.getElementsByTagName('Location')[0].firstChild.data)
except AttributeError:
pass
try:
amount = doc.getElementsByTagName('Amount')[0].firstChild.data
payment.r_amount_approved = float(amount) / 100
except AttributeError:
pass
try:
payment.r_pay_reference = doc.getElementsByTagName(
'ReferenceNumber')[0].firstChild.data
except AttributeError:
pass
if p_status not in ('Pending', 'Completed'):
msg = _('Unknown status: %s' % p_status)
log = 'invalid callback for payment %s: %s' % (payment.p_id, p_status)
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
if p_status == 'Completed' and not payment.r_amount_approved:
msg = _('Amount unconfirmed')
log = 'unsuccessful callback for payment %s: amount unconfirmed' % payment.p_id
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
if p_status == 'Pending':
msg = _('Payment pending')
log = 'unsuccessful callback for payment %s: pending' % payment.p_id
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
if payment.r_amount_approved != payment.amount_auth:
msg = _('Callback amount does not match net amount.')
log = 'unsuccessful callback for %s payment %s: callback amount %s does not match' % (
payment.p_category, payment.p_id, amount)
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
payment.p_state = 'paid'
payment.payment_date = datetime.utcnow()
msg = _('Successful callback received')
log = 'valid callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, p_status)
notify(grok.ObjectModifiedEvent(payment))
return True, msg, log
# Web checkout helper functions
def get_JSON_webcheckout_response(merchant_code, transref, host, url,
https, amount, mac=''):
amount = int(100 * amount)
hashargs = transref + merchant_code + str(amount) + mac
hashvalue = hashlib.sha512(hashargs).hexdigest()
headers={
'Content-Type':'text/xml; charset=utf-8',
'Hash':hashvalue,
}
if https:
h = httplib.HTTPSConnection(host)
else:
h = httplib.HTTPConnection(host)
args = {'merchantcode': merchant_code,
'transactionreference': transref,
'amount': amount}
url = '%s?' % url + urlencode(args)
try:
h.request("GET", url, headers=headers)
except SSLError:
return {'error': 'SSL handshake error'}
response = h.getresponse()
if response.status!=200:
return {'error': 'Connection error (%s, %s)' % (
response.status, response.reason)}
jsonout = response.read()
parsed_json = json.loads(jsonout)
return parsed_json
def confirm_transaction(payment, merchant_code, host, url, https, mac):
jr = get_JSON_webcheckout_response(merchant_code, payment.p_id, host, url,
https, payment.amount_auth, mac)
error = jr.get('error')
if error:
msg = log = error
return False, msg, log
# A typical JSON response (test payment of Hector)
#{u'SplitAccounts': [],
#u'RemittanceAmount': 0,
#u'MerchantReference': u'p6709347986663',
#u'PaymentReference': u'FBN|WEB|MX76823|13-12-2022|935097929|608001',
#u'TransactionDate': u'2022-12-13T01:34:21',
#u'RetrievalReferenceNumber': u'814212374638',
#u'ResponseDescription': u'Approved by Financial Institution',
#u'Amount': 10000,
#u'CardNumber': u'',
#u'ResponseCode': u'00',
#u'BankCode': u'011'}
if not 'ResponseCode' in jr.keys() \
or not 'ResponseDescription' in jr.keys() \
or not 'Amount' in jr.keys():
msg = _('Invalid callback: ${a}', mapping = {'a': str(jr)})
log = 'invalid callback for payment %s: %s' % (payment.p_id, str(jr))
return False, msg, log
payment.r_code = jr['ResponseCode']
payment.r_desc = jr['ResponseDescription']
payment.r_amount_approved = jr['Amount'] / 100.0
payment.r_card_num = jr.get('CardNumber', u'')
payment.r_pay_reference = jr.get('PaymentReference', u'')
#payment.r_company = u'interswitch'
if payment.r_code != '00':
msg = _('Unsuccessful callback: ${a}', mapping = {'a': payment.r_desc})
log = 'unsuccessful callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, payment.r_desc)
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
if round(payment.r_amount_approved, 0) != round(payment.amount_auth, 0):
msg = _('Callback amount does not match.')
log = 'wrong callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, str(jr))
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
if jr['MerchantReference'] != payment.p_id:
msg = _('Callback transaction id does not match.')
log = 'wrong callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, str(jr))
payment.p_state = 'failed'
notify(grok.ObjectModifiedEvent(payment))
return False, msg, log
payment.p_state = 'paid'
msg = _('Successful callback received')
log = 'valid callback for %s payment %s: %s' % (
payment.p_category, payment.p_id, str(jr))
notify(grok.ObjectModifiedEvent(payment))
return True, msg, log